6 张图带你彻底搞懂分布式事务 XA 模式
作者 | 朱晋君
XA 协议是由 X/Open 组织提出的分布式事务处理规范,主要定义了事务管理器 TM 和局部资源管理器 RM 之间的接口。目前主流的数据库,比如 oracle、DB2 都是支持 XA 协议的。
两阶段提交
-
同步阻塞,本地事务在 prepare 阶段锁定资源,如果有其他事务也要修改 xiaoming 这个账户,就必须等待前面的事务完成。这样就造成了系统性能下降。 -
协调节点单点故障,如果第一个阶段 prepare 成功了,但是第二个阶段协调节点发出 commit 指令之前宕机了,所有服务的数据资源处于锁定状态,事务将无限期地等待。 -
数据不一致,如果第一阶段 prepare 成功了,但是第二阶段协调节点向某个节点发送 commit 命令时失败,就会导致数据不一致。
三阶段提交
-
在协调节点和事务参与者都引入了超时机制。 -
第一阶段的 prepare 阶段分成了两步,canCommi 和 preCommit。
XA 事务语法介绍
XA {START|BEGIN} xid [JOIN|RESUME]
XA END xid [SUSPEND [FOR MIGRATE]]
XA PREPARE xid
XA COMMIT xid [ONE PHASE]
XA ROLLBACK xid
XA RECOVER XA RECOVER
seata XA 简介
-
TM 开启全局事务 -
RM 向 TC 注册分支事务 -
RM 向 TC 报告分支事务状态 -
TC 向 RM 发送 commit/rollback 请求 -
TM 结束全局事务
-
第一阶段首执行 XA 开启、执行 sql、XA 结束三个步骤,之后直接执行 XA prepare。 -
第二阶段执行 XA commit/rollback。
seata XA 源码
"spring.datasource") (prefix =
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
"dataSourceProxy") (
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxyXA(druidDataSource);
}
-
也可以根据普通 DataSource 来创建 XAConnection,但是这种方式有兼容性问题(比如 oracle),所以 seata 使用了开发者自己配置 XADataSource。 -
seata 提供的 XA 数据源代理,要求代码框架中必须使用 druid 连接池。
1. XA 第一阶段
1)开启 XA
-
向 TC 注册分支事务
-
调用数据源的 XA Start
xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS);
-
把 xaActive 设置为 true
2)执行 sql
3)XA end/prepare
public void commit() throws SQLException {
//省略部分源代码
try {
// XA End: Success
xaResource.end(xaBranchXid, XAResource.TMSUCCESS);
// XA Prepare
xaResource.prepare(xaBranchXid);
// Keep the Connection if necessary
keepIfNecessary();
} catch (XAException xe) {
try {
// Branch Report to TC: Failed
DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(),
BranchStatus.PhaseOne_Failed, null);
} catch (TransactionException te) {
//这儿只打印了一个warn级别的日志
}
throw new SQLException(
"Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe
.getMessage(), xe);
} finally {
cleanXABranchContext();
}
}
-
调用数据源的 XA end -
调用数据源的 XA prepare -
向 TC 报告分支事务状态
2. XA commit
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
Object msg = rpcMessage.getBody();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("rm client handle branch commit process:" + msg);
}
handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
}
public void xaCommit(String xid, long branchId, String applicationData) throws XAException {
XAXid xaXid = XAXidBuilder.build(xid, branchId);
//因为使用mysql,这里xaResource是MysqlXAConnection
xaResource.commit(xaXid, false);
releaseIfNecessary();
}
总结