基于go实现mysql数据订阅
背景
目前业务系统使用的数据库主要以mysql为主。在一些场景下我们需要把业务数据增量的同步到其他存储平台,来满足我们的业务诉求,如同步redis,ES,数据仓库等。
实现业务数据同步的方式
使用定时任务周期性的将变更数据更新到ES中这需要每个表中增加一个记录变更时间戳,亦或是定时全量更新ES;缺点:效率低,有延迟;
通过双/多写的方式:在业务写mysql的同时也将数据写一份到ES中,亦或是发送一个数据变更消息给消息队列,再通过消费者的方式同步给ES,缺点:对业务代码入侵;
除了以上的两个方式,这里主要研究和使用通过binlog 来实现mysql数据变更的订阅,可以及时高效地完成业务数据的同步。这个实现有点类似mongodb changestream 特性,但它不是 mysql自带的需要自己业务实现。以下主要分析和对比通过dump binlog的方式实现数据同步:
社区开源工具对比
类目 | Canal | mysql_stream | Maxwell |
---|---|---|---|
开发人 | 阿里 | zendesk | yelp |
开发语言 | java | java | python |
活跃度 | 高 | 高 | 高 |
HA | 支持 | 定制 | 支持 |
数据落地 | 定制 | kafka等 | kafka |
分区 | 支持 | 不支持 | 不支持 |
bootstrap | 不支持 | 支持 | 支持 |
数据格式 | 自定义 | json | json |
随机读取 | 不支持 | 支持 | 支持 |
使用文档 | 较详细 | 较详细 | json |
在各大云平台(腾讯云/华为云/阿里云)也都提供了dts(数据传输服务)服务。目前腾讯云上的DTS数据订阅服务,仅支持腾讯云上的数据库使用。DTS成立了oteam项目,可以满足私有化部署来支持非云上的数据库实现数据订阅。通过dump binlog 将数据库记录,以protobuffer协议序列化后发送到kafka中,通过consumer 订阅数据变更。以pb协议作为数据协议,这样数据占用空间更小,编解码速度更快。DTS支持按表和按表+主键的分区方式,使用表+主键的多分区方式,能够保证同一条记录变更被同一个group消费的有序性。
接下来我们来研究一个使用golang 开发的可以处理的mysql network和replication的库,它可以帮助我们实现mysql的数据订阅。腾讯云的DTS 中mysql的数据订阅的实现也使用了这个库。来一起看下这个库的实现和使用。
实现原理
使用 github.com/go-mysql-org/go-mysql库实现
canal 实现原理
2.1 canal通过dump协议向master请求binlog信息;
2.2 获取到binlog信息后,解析binlog 然后执行handler的对应的方法;
2.3 go-mysql binlog 同步 支持GTID和 file position方式;
建议使用mysql主从同步方式中的异步的同步方式而非同步和半同步方式,不需要向master同步commit状态,所以对master的读写性能没有影响;
使用限制
支持mysql、mariadb
数据库开启binlog,binlog format 必须为row模式(row binlog 记录了每行数据的变更,通过dump binlog 可以直接订阅数据。)
使用说明
创建复制账号
create user '账号' IDENTIFIED BY '账号密码';
grant SELECT, REPLICATION CLIENT,REPLICATION SLAVE,PROCESS on *.* to '账号'@'%';
flush privileges;
(备注:如果创建的账号没有FLUSH TABLES WITH READ LOCK 权限,canal dump 配置项的 SkipMasterData 需要配置为true,默认为false)
mysql binlog 检查
1、检查binlog format
# 检查是否开启binlog SHOW VARIABLES LIKE '%log_bin%';
mysql> SHOW VARIABLES LIKE '%log_bin%';
# 检查binlog format show variables like 'binlog_format';
mysql> show variables like 'binlog_format' ;
2、开启binlog,打开mysql 配置文件,增加如下配置
log-bin=mysql-bin
log-bin-index=mysql-bin.index
binlog_format=ROW
3、动态调整binlog format,如果已经开启了binlog 但 format 不为row 可以动态调整。为了保证mysql重启,设置不丢失需要同时修改mysql 配置:binlog_format=ROW。
# 在数据库连接会话里设置
set global binlog_format = ROW;
# 检查修改是否生效
select @@global.binlog_format;
canal 数据订阅demo
1、实现一个EventHander
package hander
import (
"github.com/go-mysql-org/go-mysql/canal"
"github.com/siddontang/go-log/log"
)
type DemoHander struct {
canal.DummyEventHandler
}
func (h *DemoHander) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
func (h *DemoHander) String() string {
return "MyEventHandler"
}
2、启动一个canal实例同步数据库数据
package main
import (
"canal-demo/hander"
"flag"
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/siddontang/go-log/log"
)
var (
helpFlag bool
dbHost string
dbPort int
dbUser string
dbPwd string
)
func init() {
flag.StringVar(&dbHost, "host", "", "db host")
flag.IntVar(&dbPort, "port", 3306, "db port")
flag.StringVar(&dbUser, "user", "", "db user name")
flag.StringVar(&dbPwd, "pwd", "", "db passwowrd")
}
func main() {
flag.Parse()
if helpFlag {
flag.Usage()
return
}
fmt.Println("begin")
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", dbHost, dbPort)
cfg.User = dbUser
cfg.Password = dbPwd
// 指定同步binlog 前需要同步的表数据
// 没有FLUSH TABLES WITH READ LOCK 权限,SkipMasterData需要配置为true
cfg.Dump.SkipMasterData = true
cfg.Dump.TableDB = "ieg_ctp_settlement"
cfg.Dump.Tables = []string{"budget"}
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// Register a handler to handle RowsEvent
c.SetEventHandler(&hander.DemoHander{})
// Start canal
pos := mysql.Position{Name: "", Pos: 0}
c.RunFrom(pos)
}
canal 核心逻辑
事件处理器 EventHandler
canal.DummyEventHandler 实现了EventHandler 接口的默认实现(空实现)。这些接口都是hook接口,从master获取到最新的binlog后,canal会根据不同的事件分类,调用对应的事件方法。可以通过注册自定义的EventHandler,来获取数据库的变更记录,然后再同步到我们想要同步的任何地方。
EventHandler 接口定义如下:
type EventHandler interface {
OnRotate(roateEvent *replication.RotateEvent) error
OnTableChanged(schema string, table string) error
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}
DummyEventHandler
提供了EventHandler的默认实现;
type DummyEventHandler struct {
}
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil }
func (h *DummyEventHandler) OnTableChanged(schema string, table string) error { return nil }
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil }
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
binlog 同步与解析
canal.RunFrom 方法主要有两个动作:
1、如果配置了dump 相关的库表信息就从 master中同步已有的数据;
2、启动循环开始同步binlog,直到方法异常退出或终断运行;
这两个动作主要在Canal的run方法中完成代码如下:
func (c *Canal) run() error {
defer func() {
c.cancel()
}()
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
// dump 初始化master中已有数据
if !c.dumped {
c.dumped = true
err := c.tryDump()
close(c.dumpDoneCh)
if err != nil {
log.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
}
// 开始同步binlog
if err := c.runSyncBinlog(); err != nil {
if errors.Cause(err) != context.Canceled {
log.Errorf("canal start sync binlog err: %v", err)
return errors.Trace(err)
}
}
return nil
}
binlog 同步、解析、封装、分发在 Canal的runSyncBinLog 方法中完成。为了聚焦处理流程,下面展示的代码删除了代码实现细节,仅保留了主要处理流程。
func (c *Canal) runSyncBinlog() error {
// 创建BinLogStream,方法内部创建了单独的协程来同步binlog,并解析封装为BinlogEvent
// 类型,然后投递到BinLogStream的ch 字段中,ch 为chan *BinLogEvent类型。
s, err := c.startSyncer()
for {
// 从BinLogStream的ch中获取BinLogEvent类型的数据
ev, err := s.GetEvent(c.ctx)
if err != nil {
return errors.Trace(err)
}
// 根据事件类型调用注册的EventHandler的hook方法
switch e := ev.Event.(type) {
case *replication.RotateEvent:
// 调用 EventHandler的OnRotate方法;
case *replication.RowsEvent:
// 调用 EventHandler的OnRows方法;
case *replication.XIDEvent:
// 调用 EventHandler的OnXID方法;
case *replication.MariadbGTIDEvent:
// 调用 EventHandler的OnGTID方法;
case *replication.GTIDEvent:
// 调用 EventHandler的OnGTID方法;
case *replication.QueryEvent:
// 调用 EventHandler的OnDLL方法;
// 目前仅能处理table changed DLL
default:
continue
}
if savePos {
// 更新binlog位置信息
}
return nil
}
runSyncBinlog 方法的内部处理逻辑示意图如下:
在了解了canal实现的主要逻辑后,那为什么canal要限定binlog format为row的方式?先来了解下 mysql binlog 都支持哪些格式以及不同格式之间的区别。
mysql支持的三种binlog format
STATEMENT模式(SBR:基于SQL语句的复制)
1.1 每一条会修改数据的sql语句会记录到binlog中。
1.2 优点:并不需要记录每一条sql语句和每一行的数据变化。减少了binlog日志量,节约IO,提高性能。
1.3 缺点:在某些情况下会导致master-slave中的数据不一致
ROW模式(RBR:基于行的复制)
2.1 binlog中记录数据修改记录
2.2 优点:不会出现STATEMENT模式下某些情况下的数据不一致问题,不需要在slave端解析执行sql;
2.3 缺点:binlog 记录变多,特别是在执行sql后引起的批量数据更新,会生成大量的binlog记录
MIXED模式(MBR: 混合模式复制)
1.1 混合了STATEMENT和ROW两种模式
1.2 优点:节约IO,提高性能。同时解决了纯STATEMENT模式下数据不一致的情况;
1.3 缺点:在slave 上会执行sql,一些慢sql会影响slave的性能;
mysql的binlog format的配置基本都采用row模式,这种模式虽然在执行的sql会影响多行数据的场景下会导致大量binlog记录,从而增加IO负担,但这种方式更安全,也无需在slave上执行sql数据,能够尽可能的保证数据的一致性。在了解到不同的binlog format 后,就很容易明白为什么go-mysql只支持binlog format 为row的模式。因为组件没有保存数据库的完整记录,也没有实现sql 执行引擎, 所以不支持binlog 为STATEMENT的方式。通过这样方式实现的数据订阅最方便、工作流也相对较低。
在了解了go-mysql canal 的实现原理和简单的使用后,那我们是不是要直接使用go-mysql 库实现自己的数据订阅呢?我们先来看看有没有一些直接可用的系统来帮助我们减少些工作。除了可以使用云平台的数据订阅服务外,这里再介绍一个github上的项目。
go-mysql-transfer
这里再介绍一个github上的一个基于go-mysql库实现的mysql数据库同步工具,支持多种类型的接收端(Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API)。
特性
golang实现,部署简单;
集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用;
内置丰富的数据解析、消息生成规则,支持模板语法;
支持Lua脚本扩展,可处理复杂逻辑,如:数据的转换、清洗;
集成Prometheus客户端,支持监控、告警;
集成Web Admin监控页面(备注:试用时发现web 页面有问题,web 接口可用,但页面访问有问题。而且web 接口的shutdown方法传参有问题,server.Shutdown 不能传null)
支持高可用集群部署
数据同步失败重试
支持全量数据初始化
使用说明
go-mysql-transfer 集成了多种接收端的同步,我们在使用时建议采用kafka的方式,这样可以借助kafka的订阅模式实现,一次同步多渠道订阅模式。其他服务通过订阅kafka topic 的方式同步数据。它支持通过配置文件配置同步实例,我们可以通过rainbow 来管理我们的配置信息。go-mysql-transfer 将binlog的postion 信息做了持久化保存(支持etcd、zk、boltdb存储),所以支持重启后的断点续传。
借助zk或etcd的leader选举,实现了多实例主从互备高可用。我们可以通过k8s deployment的方式单实例运行保证高可用,如果单实例运行会通过boltdb 保存binlog 信息,boltdb 文件放在本地,当容器运行的时,需要通过数据卷来保存boltdb,避免pod 重启导致binlog信息丢失。也可以扩展下存储介质。
针对具体的使用可以参考go-mysql-transfer使用文档:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425
总结
在使用时,如果是腾讯云的mysql可以直接使用腾讯云提供的数据订阅服务。如果是其他平台的mysql可以私有化部署腾讯云的数据订阅服务,也可以直接使用go-mysql-transfer来实现,当实在满足不了需求时可以使用canal自定义实现,有一定的工作量。