vlambda博客
学习文章列表

基于go实现mysql数据订阅

背景

目前业务系统使用的数据库主要以mysql为主。在一些场景下我们需要把业务数据增量的同步到其他存储平台,来满足我们的业务诉求,如同步redis,ES,数据仓库等。

实现业务数据同步的方式

  1. 使用定时任务周期性的将变更数据更新到ES中这需要每个表中增加一个记录变更时间戳,亦或是定时全量更新ES;缺点:效率低,有延迟;

  2. 通过双/多写的方式:在业务写mysql的同时也将数据写一份到ES中,亦或是发送一个数据变更消息给消息队列,再通过消费者的方式同步给ES,缺点:对业务代码入侵;

除了以上的两个方式,这里主要研究和使用通过binlog 来实现mysql数据变更的订阅,可以及时高效完成业务数据的同步。这个实现有点类似mongodb changestream 特性,但它不是 mysql自带的需要自己业务实现。以下主要分析和对比通过dump binlog的方式实现数据同步:

社区开源工具对比

类目 Canal mysql_stream Maxwell
开发人 阿里 zendesk yelp
开发语言 java java python
活跃度
HA 支持 定制 支持
数据落地 定制 kafka等 kafka
分区 支持 不支持 不支持
bootstrap 不支持 支持 支持
数据格式 自定义 json json
随机读取 不支持 支持 支持
使用文档 较详细 较详细 json

在各大云平台(腾讯云/华为云/阿里云)也都提供了dts(数据传输服务)服务。目前腾讯云上的DTS数据订阅服务,仅支持腾讯云上的数据库使用。DTS成立了oteam项目,可以满足私有化部署来支持非云上的数据库实现数据订阅。通过dump binlog 将数据库记录,以protobuffer协议序列化后发送到kafka中,通过consumer 订阅数据变更。以pb协议作为数据协议,这样数据占用空间更小,编解码速度更快。DTS支持按表和按表+主键的分区方式,使用表+主键的多分区方式,能够保证同一条记录变更被同一个group消费的有序性。

接下来我们来研究一个使用golang 开发的可以处理的mysql network和replication的库,它可以帮助我们实现mysql的数据订阅。腾讯云的DTS 中mysql的数据订阅的实现也使用了这个库。来一起看下这个库的实现和使用。

实现原理

  1. 使用 github.com/go-mysql-org/go-mysql库实现

  2. canal 实现原理 

    2.1 canal通过dump协议向master请求binlog信息;

    2.2 获取到binlog信息后,解析binlog 然后执行handler的对应的方法;

      2.3 go-mysql binlog 同步 支持GTID和 file position方式;

建议使用mysql主从同步方式中的异步的同步方式而非同步和半同步方式,不需要向master同步commit状态,所以对master的读写性能没有影响;

使用限制

  1. 支持mysql、mariadb

  2. 数据库开启binlog,binlog format 必须为row模式(row binlog 记录了每行数据的变更,通过dump binlog 可以直接订阅数据。)

使用说明

  1. 创建复制账号

create user '账号' IDENTIFIED BY '账号密码'; 
grant SELECT, REPLICATION CLIENT,REPLICATION SLAVE,PROCESS on *.* to '账号'@'%';
flush privileges;

(备注:如果创建的账号没有FLUSH TABLES WITH READ LOCK 权限,canal dump 配置项的 SkipMasterData 需要配置为true,默认为false)

mysql binlog 检查

1、检查binlog format

# 检查是否开启binlog SHOW VARIABLES LIKE '%log_bin%'; 
mysql> SHOW VARIABLES LIKE '%log_bin%';
# 检查binlog format show variables like 'binlog_format';
mysql> show variables like 'binlog_format' ;

2、开启binlog,打开mysql 配置文件,增加如下配置

log-bin=mysql-bin

log-bin-index=mysql-bin.index

binlog_format=ROW

3、动态调整binlog format,如果已经开启了binlog 但 format 不为row 可以动态调整。为了保证mysql重启,设置不丢失需要同时修改mysql 配置:binlog_format=ROW。

# 在数据库连接会话里设置 
set global binlog_format = ROW;
# 检查修改是否生效
select @@global.binlog_format;

canal 数据订阅demo

1、实现一个EventHander

package hander

import (
"github.com/go-mysql-org/go-mysql/canal"
"github.com/siddontang/go-log/log"
)
type DemoHander struct {
canal.DummyEventHandler
}
func (h *DemoHander) OnRow(e *canal.RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
func (h *DemoHander) String() string {
return "MyEventHandler"
}

2、启动一个canal实例同步数据库数据

package main

import (
"canal-demo/hander"
"flag"
"fmt"
"github.com/go-mysql-org/go-mysql/canal"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/siddontang/go-log/log"
)
var (
helpFlag bool
dbHost string
dbPort int
dbUser string
dbPwd string
)
func init() {
flag.StringVar(&dbHost, "host", "", "db host")
flag.IntVar(&dbPort, "port", 3306, "db port")
flag.StringVar(&dbUser, "user", "", "db user name")
flag.StringVar(&dbPwd, "pwd", "", "db passwowrd")
}
func main() {
flag.Parse()
if helpFlag {
flag.Usage()
return
}
fmt.Println("begin")
cfg := canal.NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:%d", dbHost, dbPort)
cfg.User = dbUser
cfg.Password = dbPwd
// 指定同步binlog 前需要同步的表数据
// 没有FLUSH TABLES WITH READ LOCK 权限,SkipMasterData需要配置为true
cfg.Dump.SkipMasterData = true
cfg.Dump.TableDB = "ieg_ctp_settlement"
cfg.Dump.Tables = []string{"budget"}
c, err := canal.NewCanal(cfg)
if err != nil {
log.Fatal(err)
}
// Register a handler to handle RowsEvent
c.SetEventHandler(&hander.DemoHander{})
// Start canal
pos := mysql.Position{Name: "", Pos: 0}
c.RunFrom(pos)
}

canal 核心逻辑

事件处理器 EventHandler

canal.DummyEventHandler 实现了EventHandler 接口的默认实现(空实现)。这些接口都是hook接口,从master获取到最新的binlog后,canal会根据不同的事件分类,调用对应的事件方法。可以通过注册自定义的EventHandler,来获取数据库的变更记录,然后同步到我们想要同步的任何地方。

EventHandler 接口定义如下:

type EventHandler interface {
OnRotate(roateEvent *replication.RotateEvent) error
OnTableChanged(schema string, table string) error
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
OnPosSynced(pos mysql.Position, set mysql.GTIDSet, force bool) error
String() string
}

DummyEventHandler

提供了EventHandler的默认实现;

type DummyEventHandler struct {
}
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil }
func (h *DummyEventHandler) OnTableChanged(schema string, table string) error { return nil }
func (h *DummyEventHandler) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error {
return nil
}
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, mysql.GTIDSet, bool) error { return nil }
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

binlog 同步与解析

canal.RunFrom 方法主要有两个动作:

1、如果配置了dump 相关的库表信息就从 master中同步已有的数据;

2、启动循环开始同步binlog,直到方法异常退出或终断运行;

这两个动作主要在Canal的run方法中完成代码如下:

func (c *Canal) run() error {
defer func() {
c.cancel()
}()
c.master.UpdateTimestamp(uint32(time.Now().Unix()))
// dump 初始化master中已有数据
if !c.dumped {
c.dumped = true
err := c.tryDump()
close(c.dumpDoneCh)
if err != nil {
log.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
}
// 开始同步binlog
if err := c.runSyncBinlog(); err != nil {
if errors.Cause(err) != context.Canceled {
log.Errorf("canal start sync binlog err: %v", err)
return errors.Trace(err)
}
}
return nil
}

binlog 同步、解析、封装、分发在 Canal的runSyncBinLog 方法中完成。为了聚焦处理流程,下面展示的代码删除了代码实现细节,仅保留了主要处理流程。

func (c *Canal) runSyncBinlog() error {
// 创建BinLogStream,方法内部创建了单独的协程来同步binlog,并解析封装为BinlogEvent
// 类型,然后投递到BinLogStream的ch 字段中,ch 为chan *BinLogEvent类型。
s, err := c.startSyncer()
for {
// 从BinLogStream的ch中获取BinLogEvent类型的数据
ev, err := s.GetEvent(c.ctx)
if err != nil {
return errors.Trace(err)
}
// 根据事件类型调用注册的EventHandler的hook方法
switch e := ev.Event.(type) {
case *replication.RotateEvent:
// 调用 EventHandler的OnRotate方法;
case *replication.RowsEvent:
// 调用 EventHandler的OnRows方法;
case *replication.XIDEvent:
// 调用 EventHandler的OnXID方法;
case *replication.MariadbGTIDEvent:
// 调用 EventHandler的OnGTID方法;
case *replication.GTIDEvent:
// 调用 EventHandler的OnGTID方法;
case *replication.QueryEvent:
// 调用 EventHandler的OnDLL方法;
// 目前仅能处理table changed DLL
default:
continue
}
if savePos {
// 更新binlog位置信息
}
return nil
}

 runSyncBinlog 方法的内部处理逻辑示意图如下:

在了解了canal实现的主要逻辑后,那为什么canal要限定binlog format为row的方式?先来了解下 mysql binlog 都支持哪些格式以及不同格式之间的区别。

mysql支持的三种binlog format

  1. STATEMENT模式(SBR:基于SQL语句的复制)

    1.1 每一条会修改数据的sql语句会记录到binlog中。

    1.2 优点:并不需要记录每一条sql语句和每一行的数据变化。减少了binlog日志量,节约IO,提高性能。

    1.3 缺点:在某些情况下会导致master-slave中的数据不一致

  2. ROW模式(RBR:基于行的复制)

    2.1 binlog中记录数据修改记录

    2.2 优点:不会出现STATEMENT模式下某些情况下的数据不一致问题,不需要在slave端解析执行sql;

    2.3 缺点:binlog 记录变多,特别是在执行sql后引起的批量数据更新,会生成大量的binlog记录

  3. MIXED模式(MBR: 混合模式复制)

    1.1 混合了STATEMENT和ROW两种模式

    1.2 优点:节约IO,提高性能。同时解决了纯STATEMENT模式下数据不一致的情况;

    1.3 缺点:在slave 上会执行sql,一些慢sql会影响slave的性能;

mysql的binlog format的配置基本都采用row模式,这种模式虽然在执行的sql会影响多行数据的场景下会导致大量binlog记录,从而增加IO负担,但这种方式更安全,也无需在slave上执行sql数据,能够尽可能的保证数据的一致性。在了解不同的binlog format 后,就很容易明白为什么go-mysql只支持binlog format 为row的模式。因为组件没有保存数据库的完整记录,也没有实现sql 执行引擎, 所以不支持binlog 为STATEMENT的方式。通过这样方式实现的数据订阅最方便、工作流也相对较低。

在了解了go-mysql canal 的实现原理和简单的使用后,那我们是不是要直接使用go-mysql 库实现自己的数据订阅呢?我们先来看看有没有一些直接可用的系统来帮助我们减少些工作。除了可以使用云平台的数据订阅服务外,这里再介绍一个github上的项目。

go-mysql-transfer

这里再介绍一个github上的一个基于go-mysql库实现的mysql数据库同步工具,支持多种类型的接收端(Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API)。

特性

  1. golang实现,部署简单;

  2. 集成多种接收端,如:Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ、HTTP API等,无需编写客户端,开箱即用;

  3. 内置丰富的数据解析、消息生成规则,支持模板语法;

  4. 支持Lua脚本扩展,可处理复杂逻辑,如:数据的转换、清洗;

  5. 集成Prometheus客户端,支持监控、告警;

  6. 集成Web Admin监控页面(备注:试用时发现web 页面有问题,web 接口可用,但页面访问有问题。而且web 接口的shutdown方法传参有问题,server.Shutdown 不能传null)

  7. 支持高可用集群部署

  8. 数据同步失败重试

  9. 支持全量数据初始化

使用说明

go-mysql-transfer 集成了多种接收端的同步,我们在使用时建议采用kafka的方式,这样可以借助kafka的订阅模式实现,一次同步多渠道订阅模式。其他服务通过订阅kafka topic 的方式同步数据。它支持通过配置文件配置同步实例,我们可以通过rainbow 来管理我们的配置信息。go-mysql-transfer 将binlog的postion 信息做了持久化保存(支持etcd、zk、boltdb存储),所以支持重启后的断点续传。

借助zk或etcd的leader选举,实现了多实例主从互备高可用。我们可以通过k8s deployment的方式单实例运行保证高可用,如果单实例运行会通过boltdb 保存binlog 信息,boltdb 文件放在本地,当容器运行的时,需要通过数据卷来保存boltdb,避免pod 重启导致binlog信息丢失。也可以扩展下存储介质。

针对具体的使用可以参考go-mysql-transfer使用文档:https://www.kancloud.cn/wj596/go-mysql-transfer/2064425

总结

在使用时,如果是腾讯云的mysql可以直接使用腾讯云提供的数据订阅服务。如果是其他平台的mysql可以私有化部署腾讯云的数据订阅服务,也可以直接使用go-mysql-transfer来实现,当实在满足不了需求可以使用canal自定义实现,有一定的工作量。