分布式|如何通过事务消息保障抢购业务的分布式一致性?
前言
事务一致性原理回顾
脏读:事务 A 读到了事务 B 还没有提交的数据。
不可重复读:在一个事务里面对某个数据读取了两次,读出来的数据不一致。
幻读:在一个事务对某个数据集用同样的方式读取了两次,数据集的条目数量不一致。
READ_UNCOMMITTED(读未提交):最低的隔离级别,可以读到未提交的数据,无法解决脏读、不可重复读、幻读中的任何一种。
READ_COMMITED (读已提交):能够防止脏读,但是无法解决不可重复读和幻读的问题。
REPEATABLE_READ (重复读取):对同一条数据的多次重复读取能保持一致,解决了脏读、不可重复读的问题,但是幻读的问题还是无法解决。
SERLALIZABLE ( 串行化):最高的事务隔离级别,避免了事务的并行执行,解决了脏读、不可重复读和幻读的问题,但性能最低。
抢购业务中的分布式事务
分布式事务的实现方式
传统分布式事务
提供了强一致性保证,在业务执行的任何时间点都能确保事务一致性。
使用简单。常见的关系型数据库都提供了对XA协议的支持,通过引入事务协调器,业务代码跟使用单机事务相比基本上没有差别。
柔性事务
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
事务消息原理分析
抢购业务场景拆解
引入消息异步通知机制
先执行本地事务,还是先发送异步消息?
如何确保远程事务能执行成功?
消息队列集群在将异步消息投递到远程事务参与方的时候,由于网络不稳定,消息没能投递成功。
消息投递成功了,但远程事务参与方还没来得及执行远程事务,就宕机了。
完整流程
事务消息实战
消息队列 RocketMQ
开通 RocketMQ 服务
创建资源
本地事务参与方的业务代码
1、初始化 TransactionProducer
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.7.2.Final</version></dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>2.13.1</version></dependency>
TransactionProducer,用于异步消息的发送,需要填入如下信息:
-
Group ID:之前创建的用于本地事务参与方的 Group ID。 -
Access key和Secret Key:RAM 用户对应的密钥信息,从 RAM 用户控制台获得。 -
Nameserver Address:RocketMQ 实例的接入点信息,从 RocketMQ 控制台获得。
Properties properties = new Properties();// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");// LocalTransactionCheckerImpl本地事务回查类的实现TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();
2、获取全局唯一的交易流水号
3、实现本地事务回查逻辑
LocalTransactionChecker接口的LocalTransactionCheckerImpl类,实现其中的check(Message)方法,该方法返回本地事务的最终状态。至于具体的业务逻辑如何实现,不在本文讨论的范围之前,我们将其封装在BusinessService类中。
package transaction;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class LocalTransactionCheckerImpl implements LocalTransactionChecker {private static Logger LOGGER = LoggerFactory.getLogger(LocalTransactionCheckerImpl.class);private static BusinessService businessService = new BusinessService();public TransactionStatus check(Message msg) {// 从消息体中获得的交易IDString transactionKey = msg.getKey();TransactionStatus transactionStatus = TransactionStatus.Unknow;try {boolean isCommit = businessService.checkbusinessService(transactionKey);if (isCommit) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);}LOGGER.warn("Transaction Key:{}transactionStatus:{}", transactionKey, transactionStatus.name());return transactionStatus;}}
4、执行本地事务并发送异步消息
LocalTransactionExecuter接口的匿名类,通过send方法进行发送,这就是本地事务参与方所需要实现的所有业务代码了。当然,这个匿名类实现了TransactionStatus execute.execute()方法,其中包含了对于本地事务的执行。完整代码如下:
package transaction;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.ONSFactory;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.SendResult;import com.aliyun.openservices.ons.api.transaction.TransactionProducer;import com.aliyun.openservices.ons.api.transaction.TransactionStatus;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Properties;import java.util.concurrent.TimeUnit;public class TransactionProducerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService = new BusinessService();private static final String TOPIC = "create_order";private static final TransactionProducer producer = null;static {Properties properties = new Properties();// 您在控制台创建的Group ID。注意:事务消息的Group ID不能与其他类型消息的Group ID共用。properties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入消息队列RocketMQ版控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");// LocalTransactionCheckerImpl本地事务回查类的实现TransactionProducer producer = ONSFactory.createTransactionProducer(properties,new LocalTransactionCheckerImpl());producer.start();}public static void main(String[] args) throws InterruptedException {String transactionKey = getGlobalTransactionKey();String messageContent = String.format("lock inventory for: %s", transactionKey);Message message = new Message(TOPIC, null, transactionKey, messageContent.getBytes());try {SendResult sendResult = producer.send(message, (msg, arg) -> {// 此处用Lambda表示,实际是实现TransactionStatus execute(final Message msg, final Object arg)方法TransactionStatus transactionStatus = TransactionStatus.Unknow;try {boolean localTransactionOK = businessService.execbusinessService(transactionKey);if (localTransactionOK) {transactionStatus = TransactionStatus.CommitTransaction;} else {transactionStatus = TransactionStatus.RollbackTransaction;}} catch (Exception e) {LOGGER.error("Transaction Key:{}", transactionKey, e);}LOGGER.warn("Transaction Key:{}", transactionKey);return transactionStatus;}, null);LOGGER.info("send message OK, Transaction Key:{}, result:{}", message.getKey(), sendResult);} catch (Exception e) {LOGGER.info("send message failed, Transaction Key:{}", message.getKey());}// demo example防止进程退出TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);}private static String getGlobalTransactionKey() {// TODOreturn "";}}
远程事务参与方的业务代码
package transaction;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.Consumer;import com.aliyun.openservices.ons.api.ONSFactory;import com.aliyun.openservices.ons.api.PropertyKeyConst;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.Properties;public class TransactionConsumerClient {private static Logger LOGGER = LoggerFactory.getLogger(TransactionProducerClient.class);private static final BusinessService businessService = new BusinessService();private static final String TOPIC = "create_order";private static final Consumer consumer = null;static {Properties properties = new Properties();// 在控制台创建的Group ID,不同于本地事务参与方使用的Group IDproperties.put(PropertyKeyConst.GROUP_ID, "XXX");// AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。properties.put(PropertyKeyConst.AccessKey, "XXX");// Accesskey Secret阿里云身份验证,在阿里云服RAM控制台创建。properties.put(PropertyKeyConst.SecretKey, "XXX");// 设置TCP接入域名,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");Consumer consumer = ONSFactory.createConsumer(properties);consumer.start();}public static void main(String[] args) {consumer.subscribe(TOPIC, "*", (message, context) -> {LOGGER.info("Receive: " + message);businessService.doBusiness(message);// 返回CommitMessage,代表给予消息队列集群异步消息已经得到正常处理的回馈return Action.CommitMessage;});}}
事务回滚
技术异常:远程事务参与方宕机、网络故障、数据库故障等。
业务异常:远程逻辑在业务上无法执行、代码业务逻辑错误等。
多个事务参与方
有可能发生业务异常的:比如锁定库存的操作,有可能因为库存不足而执行失败。又比如扣除积分的操作,有可能因为用户积分不足而无法扣除。
不太可能发生业务异常的:比如删除购物车条目的操作,除非是技术类故障,一定可以执行成功,即便对应的条目并不存在,也没有关系。又比如积分增加的操作,只要对应的用户没有注销,是不可能遇到业务异常的。
其他注意事项
消息幂等
每日对账
1、消息重试多次后,依然不成功:当消费者完全无法正常工作的时候,RocketMQ 不可能永无止境地重试消息,事实上,如果16次重试后异步消息依然没有办法被正常处理,RocketMQ 会停止尝试,将消息放到一个特殊的队列中。
2、未处理的业务异常:比如给某个账号加积分的时候,发现此账号被注销了,这是一个非常罕见的业务现象,有可能事先对此并没有健壮的处理机制。
3、幂等校验失败:处理幂等所依赖的系统比如 Redis 发生了故障,导致某些消息被重复处理。
4、其他严重的系统故障:比如网络长时间中断,留下了大量执行到一半的事务。
5、其他漏网之鱼。
