6 张图带你彻底搞懂分布式事务 XA 模式
作者 | 朱晋君
XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口。目前主流的数据库,比如 oracle、DB2 都是支持 XA 协议的。
两阶段提交
-
同步阻塞,本地事务在 prepare 阶段锁定资源,如果有其他事务也要修改 xiaoming 这个账户,就必须等待前面的事务完成。这样就造成了系统性能下降。 -
协调节点单点故障,如果第一个阶段 prepare 成功了,但是第二个阶段协调节点发出 commit 指令之前宕机了,所有服务的数据资源处于锁定状态,事务将无限期地等待。 -
数据不一致,如果第一阶段 prepare 成功了,但是第二阶段协调节点向某个节点发送 commit 命令时失败,就会导致数据不一致。
三阶段提交
-
在协调节点和事务参与者都引入了超时机制。 -
第一阶段的 prepare 阶段分成了两步,canCommi 和 preCommit。
XA 事务语法介绍
XA {START|BEGIN} xid [JOIN|RESUME]
XA END xid [SUSPEND [FOR MIGRATE]]
XA PREPARE xid
XA COMMIT xid [ONE PHASE]XA ROLLBACK xid
XA RECOVER XA RECOVER
seata XA 简介
-
TM 开启全局事务 -
RM 向 TC 注册分支事务 -
RM 向 TC 报告分支事务状态 -
TC 向 RM 发送 commit/rollback 请求 -
TM 结束全局事务
-
第一阶段首执行 XA 开启、执行 sql、XA 结束三个步骤,之后直接执行 XA prepare。 -
第二阶段执行 XA commit/rollback。
seata XA 源码
(prefix = "spring.datasource")public DruidDataSource druidDataSource() {return new DruidDataSource();}("dataSourceProxy")public DataSource dataSource(DruidDataSource druidDataSource) {return new DataSourceProxyXA(druidDataSource);}
-
也可以根据普通 DataSource 来创建 XAConnection,但是这种方式有兼容性问题(比如 oracle),所以 seata 使用了开发者自己配置 XADataSource。 -
seata 提供的 XA 数据源代理,要求代码框架中必须使用 druid 连接池。
1. XA 第一阶段
1)开启 XA
-
向 TC 注册分支事务
-
调用数据源的 XA Start
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
-
把 xaActive 设置为 true
2)执行 sql
3)XA end/prepare
public void commit() throws SQLException {//省略部分源代码try {// XA End: SuccessxaResource.end(xaBranchXid, XAResource.TMSUCCESS);// XA PreparexaResource.prepare(xaBranchXid);// Keep the Connection if necessarykeepIfNecessary();} catch (XAException xe) {try {// Branch Report to TC: FailedDefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),BranchStatus.PhaseOne_Failed, null);} catch (TransactionException te) {//这儿只打印了一个warn级别的日志}throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe);} finally {cleanXABranchContext();}}
-
调用数据源的 XA end -
调用数据源的 XA prepare -
向 TC 报告分支事务状态
2. XA commit
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}
public void xaCommit(String xid, long branchId, String applicationData) throws XAException {XAXid xaXid = XAXidBuilder.build(xid, branchId);//因为使用mysql,这里xaResource是MysqlXAConnectionxaResource.commit(xaXid, false);releaseIfNecessary();}
总结
