分布式事务 - AT模式Dubbo集成Seata
本篇基于Dubbo集成Seata实现一个分布式事务的解决方案,在整个业务流程中,会涉及如下三个服务:
订单服务:用于创建订单。
账户服务:从账户中扣减余额。
库存服务:扣减指定商品的库存数量。
下图是这三个微服务的整体架构图,用户执行下单请求时,会调用下单业务的REST接口,该接口会分别调用库存服务以及订单服务。另外,订单服务还会调用账户服务先进行资金冻结,整个流程涉及这三个服务的分布式事务问题。
项目准备
基于Spring Boot + Nacos + Dubbo构建项目,包含下面这些服务:
sample-order-service,订单服务
sample-repo-service,库存服务
sample-account-service,账户服务
sample-seata-common,公共服务组件
sample-rest-web,提供统一业务的REST接口 服务
其中sample-order-service、sample-repo-service、sample-account-service是基于Spring Boot + Dubbo构建的微服务,sample-rest-web提供统一的业务服务入口,sample-seata-common提供公共组件。
数据库准备
创建三个数据库:seata_order、seata_repo、seata_account,并分别在这三个数据库中创建对应的业务表。
--对应seata_order数据库CREATE TABLE 'tbl_order' ('id' int(11) NOT NULL AUTO_INCREMENT,'order_no' varchar(255) DEFAULT NULL,'user_id' varchar(255) DEFAULT NULL,'product_code' varchar(255) DEFAULT NULL,'count' int(11) DEFAULT 0,'amount' int(11) DEFAULT 0,PRIMARY KEY ('id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;--对应seata_repo数据库CREATE TABLE 'tbl_repo' ('id' int(11) NOT NULL AUTO_INCREMENT,'product_code' varchar(255) DEFAULT NULL,'name' varchar(255) DEFAULT NULL,'count' int(11) DEFAULT 0,PRIMARY KEY ('id'),UNIQUE KEY ('product_code')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;-- 初始数据INSERT INTO 'tbl_repo' VALUES (1, 'TEST20200606001', '键盘', '1000');INSERT INTO 'tbl_repo' VALUES (2, 'TEST20200606002', '鼠标', '100');-- 对应seata_account数据库CREATE TABLE 'tbl_account' ('id' int(11) NOT NULL AUTO_INCREMENT,'user_id' varchar(255) DEFAULT NULL,'balance' int(11) DEFAULT 0,PRIMARY KEY ('id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;--初始数据INSERT INTO 'tbl_account' VALUES (1, '1001', '10000')
核心方法说明
下面介绍部分主要代码:
sample-account-service:账户服务提供余额扣减的功能,具体代码如下:
4jpublic class AccountServiceImpl implements IAccountService{AccountMapper accountMapper;public ObjectResponse decreaseAccount(AccountDto accountDto) {ObjectResponse response=new ObjectResponse();try{int rs=accountMapper.decreaseAccount(accountDto.getUserId(),accountDto.getBalance().doubleValue());if(rs>0){response.setMsg(ResCode.SUCCESS.getMessage());response.setCode(ResCode.SUCCESS.getCode());return response;}response.setMsg(ResCode.FAILED.getMessage());response.setCode(ResCode.FAILED.getCode());}catch (Exception e){log.error("decreaseAccount Occur Exception:"+e);response.setCode(ResCode.SYSTEM_EXCEPTION.getCode());response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage());}return response;}}
sample-order-service:订单服务负责创建订单,并且在创建订单之前先基于Dubbo协议调用账户服务的资金扣减接口。
4jpublic class OrderServiceImpl implements IOrderService{OrderMapper orderMapper;OrderConvert orderConvert;IAccountService accountService;public ObjectResponse<OrderDto> createOrder(OrderDto orderDto) {log.info("全局事务ID:"+ RootContext.getXID());ObjectResponse response=new ObjectResponse();try {//账户扣款AccountDto accountDto = new AccountDto();accountDto.setUserId(orderDto.getUserId());accountDto.setBalance(orderDto.getOrderAmount());ObjectResponse accountRes = accountService.decreaseAccount(accountDto);//创建订单Order order=orderConvert.dto2Order(orderDto);order.setOrderNo(UUID.randomUUID().toString());orderMapper.createOrder(order);//判断扣款状态(判断可以前置)if(accountRes.getCode()!=ResCode.SUCCESS.getCode()){response.setMsg(ResCode.FAILED.getMessage());response.setCode(ResCode.FAILED.getCode());return response;}response.setMsg(ResCode.SUCCESS.getMessage());response.setCode(ResCode.SUCCESS.getCode());}catch (Exception e){log.error("createOrder Occur Exception:"+e);response.setCode(ResCode.SYSTEM_EXCEPTION.getCode());response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage());}return response;}}
sample-repo-service:库存服务提供库存扣减功能:
4jpublic class RepoServiceImpl implements IRepoService{RepoMapper repoMapper;public ObjectResponse decreaseRepo(ProductDto productDto) {ObjectResponse response=new ObjectResponse();try {int repo = repoMapper.decreaseRepo(productDto.getProductCode(), productDto.getCount());if(repo>0){response.setMsg(ResCode.SUCCESS.getMessage());response.setCode(ResCode.SUCCESS.getCode());return response;}response.setMsg(ResCode.FAILED.getMessage());response.setCode(ResCode.FAILED.getCode());}catch (Exception e){log.error("decreaseRepo Occur Exception:"+e);response.setCode(ResCode.SYSTEM_EXCEPTION.getCode());response.setMsg(ResCode.SYSTEM_EXCEPTION.getMessage()+"-"+e.getMessage());}return response;}}
sample-rest-web: 基于Spring Boot的web项目,主要用于对外提供以业务为维度的REST接口,会分别调用库存服务和订单服务,实现库存扣减及创建订单的功能。
public class OrderController {IRestOrderService restOrderService;ObjectResponse order( OrderRequest orderRequest) throws Exception {return restOrderService.handleBusiness(orderRequest);}}
RestOrderServiceImpl的具体实现如下:
4jpublic class RestOrderServiceImpl implements IRestOrderService {IRepoService repoService;IOrderService orderService;(timeoutMills = 300000, name = "sample-rest-web")public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception {log.info("开始全局事务:xid="+ RootContext.getXID());log.info("begin order: "+orderRequest);//1. 扣减库存ProductDto productDto=new ProductDto();productDto.setProductCode(orderRequest.getProductCode());productDto.setCount(orderRequest.getCount());ObjectResponse repoRes=repoService.decreaseRepo(productDto);//2. 创建订单OrderDto orderDto=new OrderDto();orderDto.setUserId(orderRequest.getUserId());orderDto.setOrderAmount(orderRequest.getAmount());orderDto.setOrderCount(orderRequest.getCount());orderDto.setProductCode(orderRequest.getProductCode());ObjectResponse orderRes=orderService.createOrder(orderDto);if(orderRequest.getProductCode().equals("GP20200202002")){throw new Exception("系统异常");}ObjectResponse response=new ObjectResponse();response.setMsg(ResCode.SUCCESS.getMessage());response.setCode(ResCode.SUCCESS.getCode());response.setData(orderRes.getData());return response;}}
项目启动顺序及访问
这几个项目彼此之间存在依赖关系,项目的启动顺序为:
sample-seata-common为公共组件,需要先通过mvn install到本地仓库给其他服务依赖。
启动sample-account-service,它会被订单服务调用。
启动订单服务sample-order-service。
启动库存服务sample-repo-service。
启动sample-rest-web,它作为REST的业务入口。
整合Seata实现分布式事务
在上述流程中,加入库存扣减成功了,但是在创建订单的时候,入股由于账户资金不足导致失败,就会出现数据不一致的场景。按照正常的流程来说,被扣减的库存需要加回去,这就是一个分布式事务的场景。接下来我们在项目中整合Seata来解决该问题。
添加Seata Jar包依赖
分别在4个项目中添加Seata的starter组件依赖:
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.0.0</version></dependency>
添加Seata配置项目
同样分别在4个项目中的application.yml文件中添加Seata的配置项:
seata:enabled: truetx-service-group: sample-rest-webtransport:type: TCPserver: NIOheartbeat: true #client和server通信心跳检测开关(默认为true)enable-client-batch-send-request: truethread-factory:boss-thread-prefix: NettyBossworker-thread-prefix: NettyServerNIOWorkerserver-executor-thread-prefix: NettyServerBizHandlershare-boss-worker: falseclient-selector-thread-prefix: NettyClientSelectorclient-selector-thread-size: 1client-worker-thread-prefix: NettyClientWorkerThreadboss-thread-size: 1worker-thread-size: 8shutdown:wait: 3serialization: seata #client和server通信编解码方式compressor: noneservice:vgroup-mapping: default #TC集群,需要和Seata-Server保持一致enable-degrade: false #降级开关,默认为false,业务根据连续错误数自动降级,不走Seata事务disable-global-transaction: false #全局事务开关,默认为false,false为开启,true为关闭#grouplist: 192.168.216.128:8091 #TC服务列表,也就是Seata服务端地址,只有当注册中心为file时使用client:rm:lock:lock-retry-interval: 10lock-retry-policy-branch-rollback-on-conflict: truelock-retry-times: 30rm-async-commit-buffer-limit: 10000rm-report-retry-count: 5rm-table-meta-check-enable: falserm-report-success-enable: truetm-commit-retry-count: 5tm-rollback-retry-count: 5undo:undo-log-table: undo_logundo-data-validation: trueundo-log-serialization: jacksonlog:exception-rate: 100support:spring:datasource-autoproxy: falseregistry:type: nacosnacos:cluster: defaultserver-addr: 192.168.216.128:8848
上述配置中有几个配置项需要注意:
seata.support.spring.datasource-autoproxy: true 属性表示数据源自动代理开关,在sample-order-service、sample-account-service、sample-repo-service中设置为true,在sample-rest-web中设置为false,因为该项目并没有访问数据源,不需要代理。
seata:registry:type: nacosnacos:cluster: defaultserver-addr: 192.168.216.127:8848
tx-service-group表示指定服务所属的事务分组,如果没有指定,默认使用spring.application.name加上字符串-seata-service-group。需要注意这两项配置必须要配置一项,否则会报错。
添加回滚日志表
分别在3个数据库seata-account、seata-repo、seata-order中添加一张回滚日志表,用于记录每个数据库表操作的回滚日志,当某个服务的事务出现异常时会根据该日志进行回滚。
CREATE TABLE 'undo_log' ('id' bigint(20) NOT NULL AUTO_INCREMENT,'branch_id' bigint(20) NOT NULL,'xid' varchar(100) NOT NULL,'context' varchar(128) NOT NULL,'rollback_info' longblob NOT NULL,'log_status' int(11) NOT NULL,'log_created' datetime NOT NULL,'log_modified' datetime NOT NULL,PRIMARY KEY ('id'),UNIQUE KEY 'ux_undo_log' ('xid', 'branch_id')) ENGINE=InnoDB DEFAULT CHARSET=utf-8;
sample-rest-web增加全局事务控制
修改sample-rest-web工程的RestOrderServiceImpl,做两件事情:
增加@GlobalTransactional全局事务注解
模拟一个异常处理,当商品编号等于某个指定的值时抛出异常,触发整个事务的回滚。
4jpublic class RestOrderServiceImpl implements IRestOrderService {IRepoService repoService;IOrderService orderService;(timeoutMills = 300000, name = "sample-rest-web")public ObjectResponse handleBusiness(OrderRequest orderRequest) throws Exception {log.info("开始全局事务:xid="+ RootContext.getXID());log.info("begin order: "+orderRequest);//1. 扣减库存ProductDto productDto=new ProductDto();productDto.setProductCode(orderRequest.getProductCode());productDto.setCount(orderRequest.getCount());ObjectResponse repoRes=repoService.decreaseRepo(productDto);//2. 创建订单OrderDto orderDto=new OrderDto();orderDto.setUserId(orderRequest.getUserId());orderDto.setOrderAmount(orderRequest.getAmount());orderDto.setOrderCount(orderRequest.getCount());orderDto.setProductCode(orderRequest.getProductCode());ObjectResponse orderRes=orderService.createOrder(orderDto);if(orderRequest.getProductCode().equals("GP20200202002")){throw new Exception("系统异常");}ObjectResponse response=new ObjectResponse();response.setMsg(ResCode.SUCCESS.getMessage());response.setCode(ResCode.SUCCESS.getCode());response.setData(orderRes.getData());return response;}}
