跨Mysql、Redis、Mongo的分布式事务
Mysql、Redis、Mongo都是非常流行的存储,并且各自有自己的优势。在实际的应用中,常常会同时使用多种存储,也会遇见在多种存储中保证数据一致性的需求,例如保证数据库中的库存和Redis中的库存一致等。
本文基于分布式事务框架 https://github.com/dtm-labs/dtm 给出了一个跨Mysql、Redis、Mongo多种存储引擎的一个可运行的分布式事务实例,希望能够帮助大家解决这方面的问题。
这种灵活的组合多个存储引擎形成一个分布式事务的能力,也是dtm首创做到的,目前未看到其他的分布式事务框架有这样的能力。
问题场景
我们先来看问题场景,假定现在用户参加一次活动,将自己的余额,充值话费,同时活动会赠送商城积分。其中余额存储在Mysql,话费保存在Redis,商城积分保存在Mongo,并且由于活动限时,因此可能出现参加活动失败的情况,所以需要支持回滚。
对于上述问题场景,可以使用DTM的Saga事务,下面我们就来详细讲解方案。
准备数据
如果您想要自己在本地准备相关的数据环境,可以通过 https://github.com/dtm-labs/dtm/blob/main/helper/compose.store.yml 启动Mysql、Redis、Mongo,然后通过https://github.com/dtm-labs/dtm/tree/main/sqls下面的脚本准备本例子的数据,其中busi.*
为业务数据,barrier.*
为DTM使用的辅助表
编写业务代码
我们先看最熟悉的Mysql的业务代码
func SagaAdjustBalance(db dtmcli.DB, uid int, amount int) error {
_, err := dtmimp.DBExec(db, "update dtm_busi.user_account set balance = balance + ? where user_id = ?", amount, uid)
return err
}
这段代码主要是进行数据库中用户余额的调整
对于Saga事务模式来说,当我们回滚时,我们需要反向调整余额,这部分的处理,我们可以依旧调用上述的SagaAdjustBalance
,只需要传入负数的金额即可。
对于Redis和Mongo,业务代码的处理也是类似的,只需要对相应的余额进行增减即可
如何做幂等
对于Saga事务模式来说,当我们的子事务服务出现临时故障,出现故障就会进行重试,这个故障可能出现在子事务提交前,也可能出现在子事务提交之后,因此子事务服务就需要做到幂等。
DTM 提供了辅助表和辅助的函数,用于帮助用户快速实现幂等。对于Mysql,他会在业务数据库中创建辅助表barrier,当用户开启事务调整余额时,会先在barrier表中写入gid,如果这是一个重复请求,那么写入gid时,会发现重复而失败,此时跳过用户业务上的余额调整,保证幂等。辅助函数的使用代码如下:
app.POST(BusiAPI+"/SagaBTransIn", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, reqFrom(c).Amount, reqFrom(c).TransInResult)
})
}))
Mongo处理幂等的原理与Mysql相近,不再赘述
Redis处理幂等的原理与Mysql不同,主要是因为事务的原理不同。Redis的事务主要是通过lua的原子执行来保证的。DTM的辅助函数会通过lua脚本来调整余额,调整余额前,会在redis中查询gid,如果存在,则跳过业务上的余额调整;如果不存在,则执行业务上的余额调整。辅助函数的使用代码如下:
app.POST(BusiAPI+"/SagaRedisTransOut", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), -reqFrom(c).Amount, 7*86400)
}))
如何做补偿
对于Saga来说,我们还需要处理补偿操作,但补偿操作并不是简单的反向调整,也有很多坑需要注意,否则很容易补偿出错。
一方面,补偿需要考虑幂等,因为在补偿过程中,也同样需要考虑故障重试的情况,与前一小节中的幂等处理一样。另一方面,补偿还需要考虑空补偿,因为正向分支返回失败,这个失败可能是在正向的数据已经调整完成提交之后的失败,也可能是还没有提交就返回了失败。对于数据已提交的失败,我们需要执行反向操作,对于数据未提交的失败,我们需要跳过反向操作,即处理空补偿。
DTM 提供的辅助表与辅助函数中,一方面会根据正向操作插入的gid判断是否为空补偿,另一方面还会再插入gid+'compensate',判断补偿是否为重复操作。如果是正常补偿操作,那么会执行业务上的补偿,如果是空补偿或者重复补偿,则会跳过补偿业务上的补偿。
Mysql的代码如下:
app.POST(BusiAPI+"/SagaBTransInCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).Call(txGet(), func(tx *sql.Tx) error {
return SagaAdjustBalance(tx, TransInUID, -reqFrom(c).Amount, "")
})
}))
Redis的代码如下:
app.POST(BusiAPI+"/SagaRedisTransOutCom", dtmutil.WrapHandler2(func(c *gin.Context) interface{} {
return MustBarrierFromGin(c).RedisCheckAdjustAmount(RedisGet(), GetRedisAccountKey(TransOutUID), reqFrom(c).Amount, 7*86400)
}))
补偿代码与前面的正向操作代码几乎一样,仅仅是把金额乘以-1。DTM 的辅助函数会在一个函数内部同时包含了幂等与补偿的相关逻辑
其他异常
编写子事务以及子事务的补偿时,其实还有一种异常情况是悬挂,可能出现在全局事务超时回滚,或者重试到达上线后回滚,正常情况是先正向操作再补偿,但是极端情况可能出现先补偿再正向操作,因此正向操作还需要判断补偿是否已执行,已执行的情况下,也需要跳过业务操作。
对于DTM的用户而言,这些异常都已经被优雅的妥善处理,您作为用户,只需要按照上述的MustBarrierFromGin(c).Call
进行调用即可,完全不再需要关心这些异常。DTM 处理这些异常的原理在这里进行了详细的讲述:异常与子事务屏障
发起分布式事务
前面编写完了各个子事务服务,下面这部分代码发起一个Saga全局事务:
saga := dtmcli.NewSaga(dtmutil.DefaultHTTPServer, dtmcli.MustGenGid(dtmutil.DefaultHTTPServer)).
Add(busi.Busi+"/SagaBTransOut", busi.Busi+"/SagaBTransOutCom", &busi.TransReq{Amount: 50}).
Add(busi.Busi+"/SagaMongoTransIn", busi.Busi+"/SagaMongoTransInCom", &busi.TransReq{Amount: 30}).
Add(busi.Busi+"/SagaRedisTransIn", busi.Busi+"/SagaRedisTransOutIn", &busi.TransReq{Amount: 20})
err := saga.Submit()
在这部分代码中,创建了一个Saga全局事务,该Saga事务包括3个子事务:
从Mysql中转出50
向Mongo中转入30
向Redis中转入20
在整个事务过程中,如果所有的子事务都顺利完成,那么全局事务成功;如果有一个子事务返回了业务上的失败,那么全局事务回滚。
运行
如果您想要完整运行一个上面的示例,步骤如下:
运行dtm
git clone https://github.com/dtm-labs/dtm && cd dtm
go run main.go
运行成功的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb
运行失败的例子
git clone https://github.com/dtm-labs/dtm-examples && cd dtm-examples
go run main.go http_saga_multidb_rollback
您可以对例子进行修改,模拟各种临时的故障,空补偿的情况,以及其他各种异常,当整个全局事务最终完成时,数据是一致的。
小结
本文给出了一个跨Mysql、Redis、Mongo的分布式事务例子,详细讲解了其中需要处理的问题,以及解决方案。
本文的原理适合于所有支持ACID事务的存储引擎,您可以将它快速扩展,用于其他引擎,例如TiKV等。