你了解Spring事务吗?
01 事务传播行为
1.1 什么是事务传播行为
事务传播行为用来描述某个事务传播行为修饰的方法被嵌套在另一个方法时事务时是如何传播的。
1.2 spring七种事务传播行为
传播行为 |
用途 |
备注 |
propagation_required |
表示当前方法必须运行在一个事务中。如果当前没有事务,就新建一个事务,如果已经存在一个事务, 那么该方法加入到该事务。 |
同一个事务 |
propagation_requires_new |
表示当前方法必须运行自己的事务中。如果当前存在一个事务,那么这个事务将在这个方法运行期间被挂起。 |
两个不同事务 |
propagation_nested |
表示当前方法必须运行在一个事务中。如果当前存在事务,则嵌套在事务内执行,如果当前没有事务,就新建新建事务。 |
父子事务 |
propagation_mandatory |
表示当前方法必须运行在一个事务中,如果当前不存在事务,将抛出异常。 |
|
propagation_supports |
表示当前方法不需要运行在一个事务中,但如果有一个事务已经存在,该方法也可以运行在这个事务中,如果当前没有事务,就以非事务方式执行 |
|
propagation_not_supported |
表示当前方法不需要运行在一个事务中(非事务方式执行操作),如果当前存在事务,则把当前事务挂起。 |
|
propagation_never |
表示当前方法不需要运行在一个事务中(非事务方式执行操作),如果当前存在事务,则抛出异常。 |
02 spring 事务隔离级别
2.1 问题
多个事务同时执行会出现如下问题
问题类型 |
说明 |
脏读 |
事务A读取到事务B未提交的变更 |
不重复读 |
事务A在多次查询同一个数据时,事务B对该数据修改,此时事务A再次查询该数据,与第一次查询出来的数据不一致。 |
幻读 |
当事务A读取某个范围的记录时,事务B在这个范围记录中插入一条新记录,导致事务A读取到事务B的新插入的记录。 |
2.2 解决方案
为了解决以上问题,可以通过设置隔离级别来解决
隔离级别 |
含义 |
出现的问题 |
解决的问题 |
读未提交 |
一个事务还未提交,它所做的变更被别的事务看到 |
出现脏读、不重复读、幻读 |
|
读提交 |
一个事务提交后,它所做的变更才能被别的事务看到 |
出现不重复读、幻读 |
解决脏读问题 |
可重复读 |
一个事务在执行过程中查询到的数据,总是跟这个事务在开启事务时查询到的数据一致 |
出现幻读 |
解决不可重读问题 |
串行化 |
对于同一行记录"写"会加“写锁”,“读”会加“读锁”。 |
解决幻读问题 |
2.3 示例
时间点 |
事务A |
事务B |
T1 |
start transaction 查询得到1 |
start transaction |
T2 |
查询得到1 更新值为2 |
|
T3 |
查询得到V1 |
|
T4 |
commit |
|
T5 |
查询得到V2 |
|
T6 |
commit |
|
T7 |
查询得到V3 |
执行结果
隔离级别 |
结果 |
读未提交 |
v1 = 2, v2 = 2, v3 =2 |
读提交 |
V1 = 1 V2=2, v3 =2 |
可重复读 |
v1 = 1, v2 = 1 , v3=2 |
03 前期准备
3.1 AOP相关
3.1.1 AOP代理两种方式
半自动(显示创建AOP代理) |
全自动(自动代理) |
使用FactoryBean或类似的Bean显示创建AOP代理 |
AOP代理创建交由Spring容器自动生成代理,而不是用户进行显示创建AOP代理,它是基于BeanPostProcessor |
3.1.2 自动代理
自动代理有三种方式,这三种方式都实现BeanPostProcessor。它们根据一些规则自动在容器实例化Bean时为匹配的Bean生成代理实例。下面分别介绍三种自动代理的实例。
自动代理创建器 |
含义 |
用途 |
BeanNameAutoProxyCreator |
bean配置名规则 |
为一组特定配置名的Bean自动创建代理实例 |
DefaultAdvisorAutoProxyCreator |
Advisor匹配规则 |
它会对容器中所有的Advisor进行扫描,自动将这些切面应用到匹配的bean中,为其生成代理实例 |
AnnotationAwareAspectJAutoProxyCreator |
AspectJ注解标签 |
对有AspectJ注解的Bean创建代理实例 |
3.2 NameSpaceHandler
命名空间处理器是对自定义标签(<tx:annotation-drive/>)进行解析。NameSpaceHandler加载流程如下图
04 事务介绍
源码分析是基于spring4.x,基于DataSourceTransactionManager
spring 事务使用技术:Spring Aop
4.1 核心流程
PlatformTransactionManager是Spring统一抽象的事务管理接口
DataSourceTransactionManager是PlatformTransactionManager实现,在使用时需要将DataSource注入
Mysql的Innodb引擎是支持事务的
4.2 注解(@Transactional)
下面分别介绍一下注解@Transactional中属性
属性 |
用途 |
propagation |
事务传播行为,一共七种,详情请参考1.2小节 |
isolation |
事务隔离级别,一共四种,详情请参考2.2小节,Mysql的隔离级别是可重复读 |
timeout |
事务的超时时间,默认是-1,不超时 |
readonly |
用来设置事务为只读事务。 只读事务:当事务设置为只读事务时,在整个事务的过程中,其它提交的事务内容对当前事务是不可见。 只读事务用途:保持整个事务的数据一致性,不会出现数据不一致的情况。 注:只读事务只能有读操作,不能有写操作,否则会报错。 |
rollbackFor |
用来指定某些异常进行回滚,当方法内抛出异常满足指定的异常时,进行事务回滚。 |
rollbackForClassName |
与rollbackFor类似 |
noRollbackFor |
与rollback正好相反 |
noRollbackForClassName |
与noRollbackFor |
05 声明式事务AOP部分源码解析
5.1 声明式事务主流程
下面主要是事务运行(关于事务AOP)整个流程。建议阅读时参考源码进行或者将Spring源码下载下来阅读,有助于理解Aop如何在事务应用的。
// 目的是注册AnnotationDrivenBeanDefinitionParser
TxNamespaceHandler.init()
// 说明当前事务是声明式事务
AnnotationDrivenBeanDefinitionParser.parse()
//注册自动代理创建器(代理类)和三个bean
AnnotationDrivenBeanDefinitionParser.AopAutoProxyConfigurer.configureAutoProxy()
// 目的是InfrastructureAdvisorAutoProxyCreator自动代理创建器
AopNamespaceUtils.registerAutoProxyCreatorIfNecessary
// 创建三个bean AnnotationTransactionAttributeSource、TransactionInterceptor、BeanFactoryTransactionAttributeSourceAdvisor
new RootBeanDefinition()
ParserContext.getRegistry()
// 目的是向Bean容器中注册bean
DefauultListBeanFactory.registerBeanDefinition()
// 注:InfrastructureAdvisorAutoProxyCreator与DefaultAdvisorAutoProxyCreator类似,
// 它们继承AbtractAdvisorAutoProxyCreator,最终实现InfrastructureAdvisorAutoProxyCreator接口
// 然后,在spring容器中,所有bean在实例时,都会调用postProcessAfterInstantiation方法,
// 具体可以参考Spring bean生命周期
AbstractAutoProxyCreator.postProcessAfterInstantitation()
// 找到增强,并创建被增强对象代理实例
AbstractAutoProxyCreator.wrapIfNecessary()
AbstractAdvisorAutoProxyCreator.getAdvicesAndAdvisorsForBean()
AbstractAdvisorAutoProxyCreator.findAligibleAdvisors()
AbstractAdvisorAutoProxyCreator.findAdvisorsThatCanApply()
AopUtils.findAdvisorsThatCanApply()
// 判断目标方法是否有合适的切面
AopUtils.canApply()
// 匹配到注解@Transactional
TransactionAttributeSourcePointcut.matches()
AbstractFallbackTransactionAttributeSource.getTransactionAttribute()
AbstractFallbackTransactionAttributeSource.computeTransactionAttribute()
AnnotationTransactionAttributeSource.determineTransactionAttribute()
// 解析事务注解元素属性
SpringTransactionAnnotationParser.parseTransactionAnnotation(AnnotationElement element)
// 解析事务注解的每个属性
SpringTransactionAnnotationParser.parseTransactionAnnotation(AnnotationAttributes attributes)
AbstractAdvisorAutoProxyCreator.createProxy()
5.2 主流程的补充解释
在AopAutoProxyConfigurer.configureAutoProxy()方法中创建三个bean
Bean |
类比Aop |
说明 |
AnnotationTransactionAttributeSource |
Pointcut |
该类相当于AOP中Pointcut,虽然它没有继承Pointcut,但是在BeanFactoryTransactionAttributeSource使用TransactionAttributeSourcePointcut对其进行包装 |
TransactionInterceptor |
Advice |
该类实现MethodInterceptor,最终实现Advice接口,进行方法拦截 |
BeanFactoryTransactionAttributeSourceAdvisor |
Advisor |
该类实现Advisor接口,对上面信息进行包装。 |
5.3 小结
该部分主要对事务属性进行解析,找到增强并创建被增强对象的代理实例。
06 事务拦截器源码分析(TransactionInterceptor)
6.1 事务拦截器主流程分析
TransactionInterceptor.invoke()
TransactionAspectSupport.invokeWithTransaction()
// 决定事务管理器,如使用Spring-jdbc的话DataSourceTransactionManager
TransactionAspectSupport.determineTransactionManager()
// 声明式事务
TransactionAspectSupport.createTransactionIfNecessary()
// 执行被增强的方法
Invocation.proceedWithTransaction()
// 回滚方法
TransactionAspectSupport.completeTransactionAfterThrowing()
// 清理事务信息
TransactionAspectSupport.cleanTransactionInfo()
// 提交事务
TransactionAspectSupport.commitTransactionAfterReturn()
6.2 开启事务
6.2.1 开启事务(createTransactionIfNecessary)
protected TransactionAspectSupport.TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
if (txAttr != null && ((TransactionAttribute)txAttr).getName() == null) {
txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 获取事务状态status
status = tm.getTransaction((TransactionDefinition)txAttr);
} else if (this.logger.isDebugEnabled()) {
this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured");
}
}
//根据事务管理器、事务属性、事务状态和其他属性创建事务信息
return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
}
6.2.2 获取事务(getTransaction)
主要操作是:
获取事务
如果当前线程存在事务,则分情况而定
事务超时验证
当前线程不存事务且传播行为声明为Mandatory,将抛出异常针对当前线程同步事务状态
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
//1.如果当前线程已经有数据库连接,则使用原来的连接
Object transaction = this.doGetTransaction();
boolean debugEnabled = this.logger.isDebugEnabled();
if (definition == null) {
definition = new DefaultTransactionDefinition();
//2.判断当前线程是否存在事务
// 判读依据是当前线程存储的连接不为空且连接中(ConnectionHolder)中transactionActive属性不为空
if (this.isExistingTransaction(transaction)) {
// 当前线程已经存在事务,分情况而定
return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
// 3.事务超时设置验证
} else if (((TransactionDefinition)definition).getTimeout() < -1) {
throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
//4. 当前线程不存事务且传播行为声明为Mandatory,将抛出异常
} else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
} else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = this.getTransactionSynchronization() == 0;
return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
// 5.如果事务传播行为为Required、Required_new、Nested三个中一个
} else {
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
if (debugEnabled) {
this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
}
try {
boolean newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 新建事务
this.doBegin(transaction, (TransactionDefinition)definition);
// 将事务信息记录到当前线程中,如果是新连接,同步新事务的设置,针对于当前线程,如隔离水平、事务是否激活
this.prepareSynchronization(status, (TransactionDefinition)definition);
return status;
} catch (RuntimeException var7) {
this.resume((Object)null, suspendedResources);
throw var7;
} catch (Error var8) {
this.resume((Object)null, suspendedResources);
throw var8;
}
}
}
6.2.3 处理已经存在的事务(handleExistTransaction)
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException {
//1.当事务的传播行为为Never时,将抛出异常
if (definition.getPropagationBehavior() == 5) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
} else {
AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources;
boolean newSynchronization;
//2.当事务的传播行为为Not_Supported,将当前的事务挂起
if (definition.getPropagationBehavior() == 4) {
if (debugEnabled) {
this.logger.debug("Suspending current transaction");
}
// 事务挂起的目的是记录原有事务的状态,以便于后续操作对事务的恢复
suspendedResources = this.suspend(transaction);
newSynchronization = this.getTransactionSynchronization() == 0;
return this.prepareTransactionStatus(definition, (Object)null, false, newSynchronization, debugEnabled, suspendedResources);
//3.当事务的传播行为是Required_new,将当前事务事务挂起,同时创建新的事务
} else if (definition.getPropagationBehavior() == 3) {
if (debugEnabled) {
this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
}
suspendedResources = this.suspend(transaction);
try {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
} catch (RuntimeException var7) {
this.resumeAfterBeginException(transaction, suspendedResources, var7);
throw var7;
} catch (Error var8) {
this.resumeAfterBeginException(transaction, suspendedResources, var8);
throw var8;
}
} else {
boolean newSynchronization;
//4.当事务的传播行为是Nested,嵌套事务
if (definition.getPropagationBehavior() == 6) {
if (!this.isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - specify 'nestedTransactionAllowed' property with value 'true'");
} else {
if (debugEnabled) {
this.logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (this.useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = this.prepareTransactionStatus(definition, transaction, false, false, debugEnabled, (Object)null);
status.createAndHoldSavepoint();
return status;
} else {
newSynchronization = this.getTransactionSynchronization() != 2;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, (Object)null);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
}
} else {
if (debugEnabled) {
this.logger.debug("Participating in existing transaction");
}
if (this.isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != -1) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, "ISOLATION_") : "(unknown)"));
}
}
if (!definition.isReadOnly() && TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
}
}
newSynchronization = this.getTransactionSynchronization() != 2;
return this.prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, (Object)null);
}
}
}
}
6.2.4 构造事务(doBegin)
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
Connection con = null;
try {
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.obtainDataSource().getConnection();
if (this.logger.isDebugEnabled()) {
this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// 更改事务自动提交,交由Spring进行控制
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (this.logger.isDebugEnabled()) {
this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
this.prepareTransactionalConnection(con, definition);
// 设置标志位,表示当前连接已经被事务激活
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = this.determineTimeout(definition);
if (timeout != -1) {
// 设置连接的过期时间
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 判断当前是否是新的连接
if (txObject.isNewConnectionHolder()) {
//将connectHolder绑定到当前线程
TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
}
} catch (Throwable var7) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.obtainDataSource());
txObject.setConnectionHolder((ConnectionHolder)null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
}
}
6.3 其它
事务回滚和事务的提交比较简单,内部是直接调用数据库直接事务回滚和提交的
注:事务回滚判断依据是抛出的异常是否RuntimeException或Error的类型
6.4 小结
利用TransactionInterceptor拦截,并对数据库资源进行开启事务、回滚事务 或提交事务。
07 总结
本文主要从事务基础理论(事务传播行为、隔离性),Spring事务源码(从AOP方面进行分析)和TransactionInterceptor源码分析如何开启事务、回滚事务或提交事务。
由于个人技术有限,如果有理解不到位或错误的地方,请点击“看一看”,同时留下评论,我会根据你们的建议进行修改。
---------------------TK-------------------
封面图片来源:互联网
长按下图二维码,即刻关注【天空奇点】
分享每天点滴知识,成就更好的你