我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]
作者: Jeremy Xu
链接:
https://jeremyxu2010.github.io/2020/03/%E5%BE%AE%E6%9C%8D%E5%8A%A1%E4%B8%AD%E7%9A%84%E5%88%86%E5%B8%83%E5%BC%8F%E4%BA%8B%E5%8A%A1%E6%96%B9%E6%A1%88/
两阶段提交-commit
两阶段提交-rollback
2PC看着挺美好,但其实是存在缺陷的:
2PC在执行过程中可能发生协调者或者参与者突然宕机的情况,在不同时期宕机可能有不同的现象。
情况一:协调者挂了,参与者没挂
这种情况其实比较好解决,只要找一个协调者的替代者。当他成为新的协调者的时候,询问所有参与者的最后那条事务的执行情况,他就可以知道是应该做什么样的操作了。所以,这种情况不会导致数据不一致。
情况二:参与者挂了,协调者没挂
这种情况其实也比较好解决。如果协调者挂了。那么之后的事情有两种情况:
第一个是挂了就挂了,没有再恢复。那就挂了呗,反正不会导致数据一致性问题。 第二个是挂了之后又恢复了,这时如果他有未执行完的事务操作,直接取消掉,然后询问协调者目前我应该怎么做,协调者就会比对自己的事务执行记录和该参与者的事务执行记录,告诉他应该怎么做来保持数据的一致性。
情况三:参与者挂了,协调者也挂了
这种情况比较复杂,我们分情况讨论。
协调者和参与者在第一阶段挂了。
由于这时还没有执行commit操作,新选出来的协调者可以询问各个参与者的情况,再决定是进行commit还是roolback。因为还没有commit,所以不会导致数据一致性问题。 第二阶段协调者和参与者挂了,挂了的这个参与者在挂之前并没有接收到协调者的指令,或者接收到指令之后还没来的及做commit或者roolback操作。
这种情况下,当新的协调者被选出来之后,他同样是询问所有的参与者的情况。只要有机器执行了abort(roolback)操作或者第一阶段返回的信息是No的话,那就直接执行roolback操作。如果没有人执行abort操作,但是有机器执行了commit操作,那么就直接执行commit操作。这样,当挂掉的参与者恢复之后,只要按照协调者的指示进行事务的commit还是roolback操作就可以了。因为挂掉的机器并没有做commit或者roolback操作,而没有挂掉的机器们和新的协调者又执行了同样的操作,那么这种情况不会导致数据不一致现象。 第二阶段协调者和参与者挂了,挂了的这个参与者在挂之前已经执行了操作。但是由于他挂了,没有人知道他执行了什么操作。
这种情况下,新的协调者被选出来之后,如果他想负起协调者的责任的话他就只能按照之前那种情况来执行commit或者roolback操作。这样新的协调者和所有没挂掉的参与者就保持了数据的一致性,我们假定他们执行了commit。但是,这个时候,那个挂掉的参与者恢复了怎么办,因为他之前已经执行完了之前的事务,如果他执行的是commit那还好,和其他的机器保持一致了,万一他执行的是roolback操作那?这不就导致数据的不一致性了么?虽然这个时候可以再通过手段让他和协调者通信,再想办法把数据搞成一致的,但是,这段时间内他的数据状态已经是不一致的了! 所以,2PC协议中,如果出现协调者和参与者都挂了的情况,有可能导致数据不一致。
CanCommit
、
PreCommit
、
DoCommit
三个阶段。在第一阶段,只是询问所有参与者是否可可以执行事务操作,并不在本阶段执行事务操作。当协调者收到所有的参与者都返回YES时,在第二阶段才执行事务操作,然后在第三阶段在执行commit或者rollback。
3PC相对2PC的优点:
直接分析协调者和参与者都挂的情况。
第二阶段协调者和参与者挂了,挂了的这个参与者在挂之前已经执行了操作。但是由于他挂了,没有人知道他执行了什么操作。
这种情况下,当新的协调者被选出来之后,他同样是询问所有的参与者的情况来觉得是commit还是roolback。这看上去和二阶段提交一样啊?他是怎么解决一致性问题的呢? 看上去和二阶段提交的那种数据不一致的情况的现象是一样的,但仔细分析所有参与者的状态的话就会发现其实并不一样。我们假设挂掉的那台参与者执行的操作是commit。那么其他没挂的操作者的状态应该是什么?他们的状态要么是prepare-commit要么是commit。因为3PC的第三阶段一旦有机器执行了commit,那必然第一阶段大家都是同意commit。所以,这时,新选举出来的协调者一旦发现未挂掉的参与者中有人处于commit状态或者是prepare-commit的话,那就执行commit操作。否则就执行rollback操作。这样挂掉的参与者恢复之后就能和其他机器保持数据一致性了。(为了简单的让大家理解,笔者这里简化了新选举出来的协调者执行操作的具体细节,真实情况比我描述的要复杂) 简单概括一下就是,如果挂掉的那台机器已经执行了commit,那么协调者可以从所有未挂掉的参与者的状态中分析出来,并执行commit。如果挂掉的那个参与者执行了rollback,那么协调者和其他的参与者执行的肯定也是rollback操作。 所以,再多引入一个阶段之后,3PC解决了2PC中存在的那种由于协调者和参与者同时挂掉有可能导致的数据一致性问题。
3PC也不是完美的,同样存在问题:
在doCommit阶段,如果参与者无法及时接收到来自协调者的doCommit或者rebort请求时,会在等待超时之后,会继续进行事务的提交。 所以,由于网络原因,协调者发送的abort响应没有及时被参与者接收到,那么参与者在等待超时之后执行了commit操作。这样就和其他接到abort命令并执行回滚的参与者之间存在数据不一致的情况。
由于微服务间无法直接进行数据访问,微服务间互相调用通常通过RPC(Dubbo)或Http API(Spring Cloud)进行,所以已经无法使用TM统一管理微服务的RM。 不同的微服务使用的数据源类型可能完全不同,如果微服务使用了NoSQL之类不支持事务的数据库,则事务根本无从谈起。 即使微服务使用的数据源都支持事务,那么如果使用一个大事务将许多微服务的事务管理起来,这个大事务维持的时间,将比本地事务长几个数量级。如此长时间的事务及跨服务的事务,将为产生很多锁及数据不可用,严重影响系统性能。
BASE理论由eBay的架构师Dan Pritchett提出,BASE理论是对CAP理论的延伸,核心思想是即使无法做到强一致性,应用应该可以采用合适的方式达到最终一致性。BASE是指基本可用(Basically Available)、软状态( Soft State)、最终一致性( Eventual Consistency)。
基本可用:指分布式系统在出现故障的时候,允许损失部分可用性,即保证核心可用。 软状态:允许系统存在中间状态,而该中间状态不会影响系统整体可用性。分布式存储中一般一份数据至少会有三个副本,允许不同节点间副本同步的延时就是软状态的体现。 最终一致性:最终一致性是指系统中的所有数据副本经过一定时间后,最终能够达到一致的状态。弱一致性和强一致性相反,最终一致性是弱一致性的一种特殊情况。
在微服务的架构下,有可能出现网络IO问题或者服务器宕机的问题,如果这些问题出现在时序图的第7步,使得消息投递后无法正常通知主服务(网络问题),或无法继续提交事务(宕机),那么主服务将会认为消息投递失败,会滚主服务业务,然而实际上消息已经被从服务消费,那么就会造成主服务和从服务的数据不一致。具体场景可见下面两张时序图。
事件服务(在这里就是消息服务)与业务过于耦合,如果消息服务不可用,会导致业务不可用。应该将事件服务与业务解耦,独立出来异步执行,或者在业务执行后先尝试发送一次消息,如果消息发送失败,则降级为异步发送。
业务服务在提交前,向事件服务发送事件,事件服务只记录事件,并不发送。业务服务在提交或回滚后通知事件服务,事件服务发送事件或者删除事件。不用担心业务系统在提交或者会滚后宕机而无法发送确认事件给事件服务,因为事件服务会定时获取所有仍未发送的事件并且向业务系统查询,根据业务系统的返回来决定发送或者删除该事件。 外部事件虽然能够将业务系统和事件系统解耦,但是也带来了额外的工作量:外部事件服务比起本地事件服务来说多了两次网络通信开销(提交前、提交/回滚后),同时也需要业务系统提供单独的查询接口给事件系统用来判断未发送事件的状态。
在RocketMQ这个方案里,事务消息作为一种异步确保型事务, 将两个事务分支通过 MQ 进行异步解耦,RocketMQ 事务消息的设计流程同样借鉴了两阶段提交理论,整体交互流程如下图所示:
事务发起方首先发送 prepare 消息到 MQ。
在发送 prepare 消息成功后执行本地事务。
根据本地事务执行结果返回 commit 或者是 rollback。
如果消息是 rollback,MQ 将删除该 prepare 消息不进行下发,如果是 commit 消息,MQ 将会把这个消息发送给 consumer 端。
如果执行本地事务过程中,执行端挂掉,或者超时,MQ 将会不停的询问其同组的其他 producer 来获取状态。
Consumer 端的消费成功机制有 MQ 保证。
这里浏览下RocketMQ这种方案的代码:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr("10.10.15.246:9876;10.10.15.247:9876");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TranTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
TransactionListener
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
TransactionProducer
涉及到2个角色:本地事务执行器(代码中的
TransactionListenerImpl
的
executeLocalTransaction
方法)、服务器回查客户端Listener(代码中
TransactionListenerImpl
的
checkLocalTransaction
方法)。
prepare
状态,对消费者还不可见,需要本地事务执行器返回RMQ一个确认消息。只有当确认完之后,才会将消息的状态由消费端不可见的
prepare
状态更新为消费者端可见的
commied
状态,如果本地事务执行器返回的是
rollbak
,则RMQ直接删除该
prepare
状态的消息。
prepare
状态事务消息最终应该的状态,从而将消息由消费端不可见的
prepare
状态更新为消费者端可见的
commied
状态或直接删除prepare状态的消息。
各个方案均有优缺点,按照业务场景及开发团队的能力水平选择一种合适的方案即可。
·END·
我们都是架构师!
关注架构师(JiaGouX),添加“星标”
获取每天技术干货,一起成为牛逼架构师
技术群请加若飞:1321113940 进架构师群
投稿、合作、版权等邮箱:[email protected]