分布式系统之RAFT协议
之前的文章里有介绍过ZAB,那我们今天继续介绍由Paxos引申出的另一个简化版协议:Raft。为什么这么说,因为Raft协议就是Diego Ongaro和John Ousterhout两人在使用Paxos的过程中,觉得它实在难以理解且工程实现非常复杂,故而从简单易懂的角度,设计出的协议。这在John Ousterhout在Stanford的个人主页(https://web.stanford.edu/~ouster/cgi-bin/home.php)和论文中都有提到。
既然和Paxos、ZAB一样同是共识算法,或者叫一致性协议,所以其主要的作用也比较类似,是为了解决分布式环境下多副本之间的数据一致性问题。但不同的是ZAB是Yahoo公司为Zookeeper量身定制的,而Raft在工程上的应用则更加广泛,毕竟Raft的设计初衷就是易于理解和易于实现。其中有大家比较熟知的分布式数据库TiDB,消息队列产品RocketMQ,服务治理模块Consul等。
共识算法一般可以分为两大类,一类是诸如Raft和ZAB这样,集群中必须存在一个Leader节点和多个Follower节点,Leader需要将数据同步到其他的Follower,这种称为强领导者共识(Leader-based)模式。而另一种模式为弱领导者(Leader-less)共识模式,即系统中所有节点角色的权利都是均等的,此类算法比如Paxos。可以想象前者相比后者,会增加一个选举阶段,而且存在单点问题,但是工程实现逻辑上要简单很多。因为后者经常需要处理节点之间的冲突情况。
Raft采用了复制状态机(State Machine Replication)的实现模式,这是一个在分布式系统中被广泛应用的模式,说起来很简单,复制状态机就是通过一系列有序的操作日志来达到数据一致性的效果,即每条日志中包含了一个操作命令,Leader有序的向所有Follower同步这些日志,这样只要保障了所有节点上的操作日志是一致的,那么对于所有节点上任意相同的日志索引位置,其内容也是一致的。
如下图(图 1)所示,是三个节点的操作日志,假设左侧为旧日志,右侧为新日志,那么在T1时刻,该三个节点的内容都是(X=5, Y=4):
图 1:基于日志的复制状态机
协议约束:
为了保障Raft协议的有效性,一个实现了Raft协议的工程,其必须满足如下约束,或者说达到如下条件。
· Election Safety:在任意任期(term)中,最多只能存在一个Leader。
· Leader Append-Only:Leader对日志只能追加,不能删除和修改。
· Log Matching:如果两个节点上某日志的索引(index)和任期(term)相同,则这两个节点从0至该索引(index)之间的所有日志必须完全一致。
· Leader Completeness:如果某条日志在某个任期(term)被提交,那么后续任意任期(term)的Leader都会拥有该条日志,说明已经被提交的日志是不会丢失的。
· State Machine Safety:如果某个索引位置的日志已经应用于本地状态机,那么其他所有节点对于该索引位置都必须应用相同的日志。
后面我们可以逐步看到Raft具体是如何实现以上约束的。
节点状态:
在Raft协议中,每个节点会有三种不同的状态,分别是:Leader,Follower,和Candidate。
Leader负责接收客户端的请求,然后以2PC的方式(图 2)向各个Follower提交操作日志,当超过半数节点成功同步了该日志到本地,Leader则认为日志同步成功,发起提交操作,更新本地状态机,并向所有Follower发送提交请求,Follower收到提交请求后,提交该日志,更新本地状态机。此时客户端的请求成功完成。
图 2:Raft的2PC
从上可以看出典型的Raft请求一般有两种,一个是Leader发给Follower的同步日志请求和心跳请求,这个叫AppendEntries RPC,另一个是Candidate发送选票使用的,叫RequestVote RPC。下面简单说一下这两种请求的内容。
请求类型:
1、AppendEntries RPC
请求(Request):
· term:Leader的任期编号。
· leaderId:Leader ID。
· prevLogEntry:上次所同步日志的索引编号。
· prevLogTerm:上次所同步日志的Leader任期编号。
· Entries[]:本次要同步的日志,如果该内容为空,则代表为心跳请求。
· leaderCommit:本次要同步日志的索引条目。
返回(Response):
· term:接收节点当前的任期编号。
· success:如果本次要同步的日志被接受,则返回true。
2、RequestVote RPC
请求(Request):
· term:Candidate的任期编号。
· candidateId:Candidate ID。
· lastLogIndex:Candidate所拥有的最新日志的索引编号。
· lastLogTerm:Candidate所拥有的最新日志的Leader任期编号。
返回(Response):
· term:接收节点当前的任期编号。
· voteGranted:如果接收节点同意投票给该Candidate,则返回true。
现在看这些内容可能还有些费解,没有关系,结合之后我们所介绍的Raft运行过程,就会很容易理解了。
运行过程:
原论文中将Raft的过程分成三个子问题来介绍,分别是:Leader选举, 日志复制,安全性。
1、Leader选举(leader election)
系统初次启动时,所有节点的状态都是Follower,既然没有Leader,那么所有的Follower节点自然也就收不到来自Leader的心跳。所以当有某个Follower倒计时完成后,它就会切换到Candidate状态,把自身的term加1,然后向其它所有节点发送RequestVote RPC请求。
其它节点在收到一个RequestVote RPC后,首先判断发出请求的Candidate节点的任期(term)和日志进度(index)是否符合预期,通过验证则采用先到先得的方式进行投票,即每个节点仅有一次投票机会。
当有Candidate收到的票数超过集群半数之后,则选举成功,将自己的状态从Candidate切换成Leader,并立刻向其它所有节点发出一个心跳,告知选举已结束。其它节点收到信息后,将自己的状态转换为Follower。
如果集群中有多个Candidate收到了相同的票数,或者说没有一个Candidate拿到了超过半数的选票,在等待超时后,会认为此次选举失败,则会再次发出RequestVote RPC,直到选取成功,此处Raft协议为了避免反复选不出Leader情况出现,Candidate每轮选举的超时时长都是一个从150ms至300ms的随机值,由此错开多个Candidate发出RequestVote RPC的时间。
下面是RequestVote RPC的处理逻辑源码(码 1),供大家参考。
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
//如果Candidate的Term比当前节点的Term小,则不投票。
if req.Term < r.getCurrentTerm() {
return
}
//如果Candidate的Term比当前节点大,则当前节点变为Follower。
if req.Term > r.getCurrentTerm() {
r.logger.Debug("lost leadership because received a requestVote with a newer term")
r.setState(Follower)
r.setCurrentTerm(req.Term)
resp.Term = req.Term
}
//如果该节点在本轮已经投过票,则不能再次投票。
if lastVoteTerm == req.Term && lastVoteCandBytes != nil {
r.logger.Info("duplicate requestVote for same term", "term", req.Term)
if bytes.Compare(lastVoteCandBytes, req.Candidate) == 0 {
r.logger.Warn("duplicate requestVote from", "candidate", candidate)
resp.Granted = true
}
return
}
//如果Candidate日志的Term小于历史Term,则不投票。
lastIdx, lastTerm := r.getLastEntry()
if lastTerm > req.LastLogTerm {
r.logger.Warn("rejecting vote request since our last term is greater",
"candidate", candidate,
"last-term", lastTerm,
"last-candidate-term", req.LastLogTerm)
return
}
//如果Candidate的日志编号小于本地编号,则不投票。
if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex {
r.logger.Warn("rejecting vote request since our last index is greater",
"candidate", candidate,
"last-index", lastIdx,
"last-candidate-index", req.LastLogIndex)
return
}
//记录当前选票,确定投票。
if err := r.persistVote(req.Term, req.Candidate); err != nil {
r.logger.Error("failed to persist vote", "error", err)
return
}
resp.Granted = true
r.setLastContact()
return
}
// 码 1:RequestVote RPC的处理逻辑 raft.go
2、日志复制(log replication)
日志复制是复制状态机的主要同步模式,也就是最为核心的功能。
假设一个需求从客户端发出到系统的任意节点,如果该节点是Follower,则会将请求转发给Leader,Leader将该请求追加到本地日志中,然后广播AppendEntries RPC到所有Follower,Follower收到请求后,对内容和来源进行验证,包括prevLogEntry和prevLogTerm是否合法,确保同步前本地的日志记录和Leader是一致的,验证通过后,将新日志追加到本地日志末尾,然后反馈给Leader。Leader收到的确认超过半数后,才会认为该操作是可以被commit的,此时会将该条操作应用到本地的状态机中,并向所有Follower广播commit操作,最后返回操作成功的消息给客户端。
当时实际情况会更加复杂,当一个选举刚刚结束,有新的Leader当选的时候,Leader和Follower之间的日志进度很可能是不一样的,这是所有基于Quorum原则的系统必然要面对的一个问题,有的Follower没有成功同步Leader的操作日志,那么Follower记录就会少,有时集群中绝大部分Follower和Candidate的操作日志进度都要比某个Follower的进度早,但是因为Quorum原则规定Candidate只要获得超过半数的选票即可当选Leader,所以Follower有可能比Leader的日志还要新。
那么新的Leader如何保障其他Follower的日志记录和自己保持一致。Leader在本地针对每个Follower维护了一个nextIndex,这里面记录了每个Follower和当前Leader的日志同步进度,当Follower发现AppendEntries RPC里面的日志进度和本地对照不上时,会拒绝同步请求,并告知Leader参与对比的本地日志相同term的第一条日志的索引,此时Leader会将该Follower的nextIndex设置为返回的索引减1,然后在下次请求的时候再次和Follower对比,直到找到Leader和Follower一致的那条日志记录,则将Follower自此位置后的所有日志删除,然后Leader发送在这之后的日志给Follower,从而实现新Leader和Follower的日志一致。
这当中其实可以看到一个问题就是我们不可能允许日志的无限增长,实际上Raft中的每个节点都会对本地日志进行压缩,个人觉得这里和Spark的checkpoint机制会有一点相似。就是Raft会选取一段日志,一般是针对个别数据的频繁操作日志段(压缩算法在选取被压缩对象时的策略其实都比较雷同),然后将这部分变化的最终状态保存成一个快照,最后再删除该段操作日志,由此达到缩减日志大小的目的。并且如果Leader在向Follower同步历史日志时,发现本地要同步的那段日志已经被自己压缩成了快照,则会发送一个InstallSnapshot RPC给Follower,进行快照同步。
以下是接收到AppendEntries RPC后的处理逻辑(码 2),我已经删除了一些部分,但内容还是比较长,供大家参考。
func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// 来源的Term小于本地Term,直接失败。
if a.Term < r.getCurrentTerm() {
return
}
//来源Term大于本地Term,并且本地节点不是Follower,则设置本地节点状态为Follower,更新本地Term。
if a.Term > r.getCurrentTerm() || r.getState() != Follower {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(a.Term)
resp.Term = a.Term
}
//更新Leader地址
r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))
//校验prevLogEntry和prevLogTerm
if a.PrevLogEntry > 0 {
lastIdx, lastTerm := r.getLastEntry()
var prevLogTerm uint64
if a.PrevLogEntry == lastIdx {
//如果Leader上条日志编号和本地最后一条日志编号一样,
//则获取本地最后一条日志的Term。
prevLogTerm = lastTerm
} else {
//如果Leader上条日志编号和本地最后一条日志编号不一样,
//则获取本地编号为PrevLogEntry的Term。
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn("failed to get previous log",
"previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true
return
}
prevLogTerm = prevLog.Term
}
//将Leader和上面获取的本地Term对比,不一样则失败。
if a.PrevLogTerm != prevLogTerm {
r.logger.Warn("previous log term mis-match",
"ours", prevLogTerm,
"remote", a.PrevLogTerm)
resp.NoRetryBackoff = true
return
}
}
//如果AppendEntries RPC中的Entries字段不为空,即是一个日志同步请求。
if len(a.Entries) > 0 {
start := time.Now()
lastLogIdx, _ := r.getLastLog()
var newEntries []*Log
for i, entry := range a.Entries {
if entry.Index > lastLogIdx {
//如果传入的日志编号大,则对于Follower来讲,这是一条新日志。
newEntries = a.Entries[i:]
break
}
var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn("failed to get log entry",
"index", entry.Index,
"error", err)
return
}
//如果传入的日志的Term和在Follower具有相同索引的日志的Term不一样,
//则先清理Follower节点上从传入日志索引之后的所有日志,
//在将传入的日志当做一条新日志处理。
if entry.Term != storeEntry.Term {
r.logger.Warn("clearing log suffix",
"from", entry.Index,
"to", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error("failed to clear log suffix", "error", err)
return
}
if entry.Index <= r.configurations.latestIndex {
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
}
}
}
// 处理Leader已提交但Follower未提交的日志
if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() {
start := time.Now()
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
}
// Everything went well, set success
resp.Success = true
r.setLastContact()
return
}
// 码 2:AppendEntries RPC的处理逻辑 raft.go
3、安全性(safety)
我们先看一下下图(图 3)的情况,这是原文中列举的一个例子:
图 3:如果Term 2在时刻(c)被提交,则在时刻(d)出现了已提交的日志被覆盖的情况。
假设有五个节点S1至S5,按照从(a)到(d)的时间顺序执行:
(a)、S1为Leader(Term 2),将该日志同步到了S2。
(b)、S1挂掉,S5获得了S3、S4、S5节点的支持,从而当选为新Leader(Term 3)。
(c)、S5挂掉,S1重启,并当选为Leader(Term4),开始和Follower做数据对齐,将Term 2的日志同步到S3,然后此时发现Term 2的日志同步数量超过半数。
(d)、S1挂掉,S5重启,并当选Leader,开始数据对齐,使用Term 3的日志覆盖了Term 2的日志。
以上过程中,如果在(c)时刻,S1发现Term 2的日志同步已经超过半数了后提交了该日志,之后在(d)时刻就出现了已经被提交的Term 2日志被S5删除掉的状况,有悖于Raft协议的约束,所以提交前任日志的行为在Raft是不被允许的。实际过程为在(c)阶段,S1会自动提交Term 4之前的所有日志。实际过程如下所示(图 4),这样S5即便重启,也会因为其最后一条日志的Term 3小于S1、S2、S3的Term 4日志,而无法获得超过半数的选票而成为Leader,也就避免了Term 2日志的丢失。
图 4:Raft实际日志提交方式
大家是否觉得Raft要比ZAB还更加容易理解一些。下面给出一个官方制作的可交互动画,结合上面所学的内容,再看一下这个动画,大家就能更加形象的了解Raft了,便于加深记忆。