开源goraft源码分析
(给Go开发大全加星标)
来源:丁凯
https://zhuanlan.zhihu.com/p/27415397
【导读】Raft协议是分布式一致性协议。goraft是Golang版本的Raft协议实现,本文详细探究了goraft源码。
数据结构
goraft主要抽象了server、peer和log三个结构,分别代表服务节点、Follower节点和日志。
- server 
Raft作为一种多节点状态一致性维护协议,运行过程中必然涉及到多个物理节点,server就是用来抽象其中的每个节点,维护节点的状态信息。其结构如下:
type server struct {*eventDispatchername stringpath stringstate stringtransporter Transportercontext interface{}currentTerm uint64votedFor stringlog *Logleader stringpeers map[string]*Peermutex sync.RWMutexsyncedPeer map[string]boolstopped chan boolc chan *evelectionTimeout time.DurationheartbeatInterval time.Durationsnapshot *Snapshot// PendingSnapshot is an unfinished snapshot.// After the pendingSnapshot is saved to disk,// it will be set to snapshot and also will be// set to nil.pendingSnapshot *SnapshotstateMachine StateMachinemaxLogEntriesPerRequest uint64connectionString stringroutineGroup sync.WaitGroup}
- state:每个节点总是处于以下状态的一种:follower、candidate、leader 
- currentTerm:Raft协议关键概念,每个term内都会产生一个新的leader 
- peers:raft中每个节点需要了解其他节点信息,尤其是leader节点 
- syncedPeer:对于leader来说,该成员记录了日志已经被sync到了哪些follower 
- c:当前节点的命令通道,所有的命令都通过该channel来传递 
- pendingSnapshot:暂时未知 
- peer 
 peer描述的是集群中其他节点的信息,结构如下:
// A peer is a reference to another server involved in the consensus protocol.type Peer struct {server *serverName string `json:"name"`ConnectionString string `json:"connectionString"`prevLogIndex uint64stopChan chan boolheartbeatInterval time.DurationlastActivity time.Timesync.RWMutex}
- server:peer中的某些方法会依赖server的状态,如peer内的appendEntries方法需要获取server的currentTerm 
- Name:peer的名称 
- prevLogIndex:这个很关键,记录了该peer的当前日志index,接下来leader将该index之后的日志继续发往该peer 
- lastActivity:记录peer的上次活跃时间 
- log 
log是Raft协议的核心,Raft使用日志来存储客户发起的命令,并通过日志内容的同步来维护多节点上状态的一致性。
A log is a collection of log entries that are persisted to durable storage.type Log struct {ApplyFunc func(*LogEntry, Command) (interface{}, error)file *os.Filepath stringentries []*LogEntrycommitIndex uint64mutex sync.RWMutexstartIndex uint64startTerm uint64initialized bool}
- ApplyFunc:日志被应用至状态机的方法,这个应该由使用raft的客户决定 
- file:日志文件句柄 
- path:日志文件路径 
- entries:内存日志项缓存 
- commitIndex:日志提交点,小于该提交点的日志均已经被应用至状态机 
- startIndex/startTerm:日志中起始日志项的index和term 
- log entry 
log entry是客户发起的command存储在日志文件中的内容
type LogEntry struct {Index *uint64 `protobuf:"varint,1,req" json:"Index,omitempty"`Term *uint64 `protobuf:"varint,2,req" json:"Term,omitempty"`CommandName *string `protobuf:"bytes,3,req" json:"CommandName,omitempty"`Command []byte `protobuf:"bytes,4,opt" json:"Command,omitempty"`XXX_unrecognized []byte `json:"-"`}
// A log entry stores a single item in the log.type LogEntry struct {pb *protobuf.LogEntryPosition int64 // position in the log filelog *Logevent *ev}
- LogEntry是日志项在内存中的描述结构,其最终存储在日志文件是经过protocol buffer编码以后的信息 
- Position代表日志项存储在日志文件内的偏移 
- 编码后的日志项包含Index、Term,原始Command的名称以及Command具体内容 
关键流程
- 客户端请求 
客户端使用go-raft的时候,先初始化环境,这里不仔细描述,接下来看客户如何发起一个请求:
command := &raft.DefaultJoinCommand{}if _, err := s.raftServer.Do(command); err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}
客户命令执行的入口是Do:
func (s *server) Do(command Command) (interface{}, error) {return s.send(command)}// Sends an event to the event loop to be processed. The function will wait until the event is actually processed before returning.func (s *server) send(value interface{}) (interface{}, error) {if !s.Running() {return nil, StopError}event := &ev{target: value, c: make(chan error, 1)}select {case s.c <- event:case <-s.stopped:return nil, StopError}select {case <-s.stopped:return nil, StopErrorcase err := <-event.c:return event.returnValue, err}}
send的处理流程很简单,首先将命令写入到server的命令channel,然后等待命令处理完成。
而server作为leader启动完成时会进入一个leaderLoop来处理所有用户的命令:
func (s *server) leaderLoop() {logIndex, _ := s.log.lastInfo()......// Begin to collect response from followersfor s.State() == Leader {select {case <-s.stopped:......case e := <-s.c:switch req := e.target.(type) {// 代表客户端命令case Command:s.processCommand(req, e)continue......}}}}
processCommand处理如下:
// Processes a command.func (s *server) processCommand(command Command, e *ev) {s.debugln("server.command.process")// Create an entry for the command in the log.entry, err := s.log.createEntry(s.currentTerm, command, e)if err != nil {s.debugln("server.command.log.entry.error:", err)e.c <- errreturn}if err := s.log.appendEntry(entry); err != nil {s.debugln("server.command.log.error:", err)e.c <- errreturn}s.syncedPeer[s.Name()] = trueif len(s.peers) == 0 {commitIndex := s.log.currentIndex()s.log.setCommitIndex(commitIndex)s.debugln("commit index ", commitIndex)}}
这里的逻辑比较简单,创建日志项并将日志项append至日志文件,如果过程中有任何错误,就将这个错误写入e.c:e.c <- err,这样等待在该channel的客户端就会收到通知,立即返回。
如果没有错误,这时候客户端还是处于等待状态的,这是因为虽然该Command被leader节点成功处理了,但是该Command的日志还没有被同步至大多数Follow节点,因此该Command也就无法被提交,所以发起该Command的客户端依然等在那,Command被提交,这在后面的日志同步过程中会有所体现。
- 日志同步 
go-raft的leader向Follower同步日志是在heartbeat中完成的:
// Listens to the heartbeat timeout and flushes an AppendEntries RPC.func (p *Peer) heartbeat(c chan bool) {stopChan := p.stopChanc <- trueticker := time.Tick(p.heartbeatInterval)for {select {case flush := <-stopChan:if flush {// before we can safely remove a node// we must flush the remove command to the node firstp.flush()return} else {return}case <-ticker:start := time.Now()p.flush()duration := time.Now().Sub(start)p.server.DispatchEvent(newEvent(HeartbeatEventType, duration, nil))}}}
func (p *Peer) flush() {debugln("peer.heartbeat.flush: ", p.Name)prevLogIndex := p.getPrevLogIndex()term := p.server.currentTermentries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)if entries != nil {p.sendAppendEntriesRequest(newAppendEntriesRequest(term, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))} else {p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.snapshot))}}
核心的逻辑是将leader上的日志通过构造一个AppendEntriesRequest发送给从节点,当然只同步那些Follower上还没有的日志,即prevLogIndex以后的log entry。
// Sends an AppendEntries request to the peer through the transport.func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)if resp == nil {p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))return}p.setLastActivity(time.Now())// If successful then update the previous log index.p.Lock()if resp.Success() {......}......resp.peer = p.Name// Send response to server for processing.p.server.sendAsync(resp)}
这里会将Follower的心跳的响应继续发送给server。server会在leaderLoop中处理该类消息:
func (s *server) leaderLoop() {logIndex, _ := s.log.lastInfo()......// Begin to collect response from followersfor s.State() == Leader {select {case e := <-s.c:switch req := e.target.(type) {case Command:s.processCommand(req, e)continuecase *AppendEntriesRequest:e.returnValue, _ = s.processAppendEntriesRequest(req)case *AppendEntriesResponse:s.processAppendEntriesResponse(req)case *RequestVoteRequest:e.returnValue, _ = s.processRequestVoteRequest(req)}// Callback to event.e.c <- err}}s.syncedPeer = nil}
处理Follower的响应在函数processAppendEntriesResponse中:
func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {// If we find a higher term then change to a follower and exit.if resp.Term() > s.Term() {s.updateCurrentTerm(resp.Term(), "")return}// panic response if it's not successful.if !resp.Success() {return}// if one peer successfully append a log from the leader term,// we add it to the synced listif resp.append == true {s.syncedPeer[resp.peer] = true}if len(s.syncedPeer) < s.QuorumSize() {return}// Determine the committed index that a majority has.var indices []uint64indices = append(indices, s.log.currentIndex())for _, peer := range s.peers {indices = append(indices, peer.getPrevLogIndex())}sort.Sort(sort.Reverse(uint64Slice(indices)))commitIndex := indices[s.QuorumSize()-1]committedIndex := s.log.commitIndexif commitIndex > committedIndex {s.log.sync()s.log.setCommitIndex(commitIndex)}}
这里会判断如果多数的Follower都已经同步日志了,那么就可以检查所有的Follower此时的日志点,并根据log index排序,leader会算出这些Follower的提交点,然后提交,调用setCommitIndex。
// Updates the commit index and writes entries after that index to the stable storage.func (l *Log) setCommitIndex(index uint64) error {l.mutex.Lock()defer l.mutex.Unlock()// this is not error any more after limited the number of sending entries// commit up to what we already haveif index > l.startIndex+uint64(len(l.entries)) {index = l.startIndex + uint64(len(l.entries))}if index < l.commitIndex {return nil}for i := l.commitIndex + 1; i <= index; i++ {entryIndex := i - 1 - l.startIndexentry := l.entries[entryIndex]l.commitIndex = entry.Index()// Decode the command.command, err := newCommand(entry.CommandName(), entry.Command())if err != nil {return err}returnValue, err := l.ApplyFunc(entry, command)if entry.event != nil {entry.event.returnValue = returnValueentry.event.c <- err}_, isJoinCommand := command.(JoinCommand)if isJoinCommand {return nil}}return nil}
这里的提交主要是设置好commitIndex,并且将日志项中的Command应用到状态机。最后,判断这个LogEntry是不是由客户直接发起的,如果是,那么还需要将状态机的处理结果通过event.c返回给客户端,这样,客户端就可以返回了,请回顾上面的客户端请求。
- 选主 
在Raft协议运行过程中,Leader节点会周期性的给Follower发送心跳,心跳的作用有二:一方面,Follower通过心跳确认Leader此时还是活着的;第二,Leader通过心跳将自身的日志同步发送给Follower。
但是,如果Follower在超过一定时间后没有收到Leader的心跳信息,就认定Leader可能离线,于是,该Follower就会变成Candidate,发起一次选主,通知其他节点开始为我投票。
func (s *server) followerLoop() {since := time.Now()electionTimeout := s.ElectionTimeout()timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)for s.State() == Follower {var err errorupdate := falseselect {......// 超过一定时间未收到请求case <-timeoutChan:if s.promotable() {// 状态变为Candidates.setState(Candidate)} else {update = true}}}......}
// The main event loop for the serverfunc (s *server) loop() {defer s.debugln("server.loop.end")state := s.State()for state != Stopped {switch state {case Follower:s.followerLoop()// 状态变为Candidate后,进入candidateLoopcase Candidate:s.candidateLoop()case Leader:s.leaderLoop()case Snapshotting:s.snapshotLoop()}state = s.State()}}
当节点状态由Follower变为Candidate后,就会进入candidateLoop来触发一次选主过程。
func (s *server) candidateLoop() {for s.State() == Candidate {if doVote {s.currentTerm++s.votedFor = s.name// 向所有其他节点发起Vote请求respChan = make(chan *RequestVoteResponse, len(s.peers))for _, peer := range s.peers {s.routineGroup.Add(1)go func(peer *Peer) {defer s.routineGroup.Done()peer.sendVoteRequest(newRequestVoteRequest(s.currentTerm, s.name, lastLogIndex, lastLogTerm), respChan)}(peer)}// 自己给自己投一票votesGranted = 1timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)doVote = false}// 如果多数节点同意我作为Leader,设置新状态if votesGranted == s.QuorumSize() {s.setState(Leader)return}
// 等待其他节点的选主请求的响应select {case <-s.stopped:s.setState(Stopped)returncase resp := <-respChan:if success := s.processVoteResponse(resp); success {votesGranted++}......case <-timeoutChan:// 如果再一次超时了,重新发起选主请求doVote = true}}
别看上面的代码很多,但是其中逻辑非常清楚。就不作过多说明了。
上面描述了一个Follower节点变为Candidate后,如何发起一次选主,接下来看看一个节点在收到其他节点发起的选主请求后的处理,
// Processes a "request vote" request.func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool){if req.Term < s.Term() {return newRequestVoteResponse(s.currentTerm, false), false}if req.Term > s.Term() {s.updateCurrentTerm(req.Term, "")} else if s.votedFor != "" && s.votedFor != req.CandidateName {return newRequestVoteResponse(s.currentTerm, false), false}lastIndex, lastTerm := s.log.lastInfo()if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {return newRequestVoteResponse(s.currentTerm, false), false}s.votedFor = req.CandidateNamereturn newRequestVoteResponse(s.currentTerm, true), true}
接受一个远程节点的选主请求需要满足以下条件:
- 远程节点的term必须要大于等于当前节点的term; 
- 远程节点的log必须比当前节点的更新; 
- 当前节点的term和远程节点的选主请求的term如果一样且当前节点未给任何其他节点投出自己的选票。 
整个流程其实也是蛮简单的。
- 节点变更 
在Raft协议中,节点的变更也是作为一个客户的命令通过一致性协议统一管理:也就是说,节点变更命令被写入Leader的日志,然后再由Leader同步到Follower,最后如果多数Follower成功写入该日志,主节点提交该日志。
在Go-Raft中,存在两种节点变更命令:
DefaultJoinCommand和DefaultLeaveCommand
对于这两种命令的处理关键在于这两个命令的Apply方法,如下:
func (c *DefaultJoinCommand) Apply(server Server) (interface{}, error) {err := server.AddPeer(c.Name, c.ConnectionString)return []byte("join"), err}
func (c *DefaultLeaveCommand) Apply(server Server) (interface{}, error) {err := server.RemovePeer(c.Name)return []byte("leave"), err}
增加节点最终的提交方法是AddPeer:
func (s *server) AddPeer(name string, connectiongString string) error {if s.peers[name] != nil {return nil}if s.name != name {peer := newPeer(s, name, connectiongString, s.heartbeatInterval)// 如果是主上新增一个peer,那还需要启动后台协程发送if s.State() == Leader {peer.startHeartbeat()}s.peers[peer.Name] = peers.DispatchEvent(newEvent(AddPeerEventType, name, nil))}// Write the configuration to file.s.writeConf()return nil}
// Removes a peer from the server.func (s *server) RemovePeer(name string) error {// Skip the Peer if it has the same name as the Serverif name != s.Name() {// Return error if peer doesn't exist.peer := s.peers[name]if peer == nil {return fmt.Errorf("raft: Peer not found: %s", name)}// 如果是Leader,停止给移除节点的心跳协程if s.State() == Leader {s.routineGroup.Add(1)go func() {defer s.routineGroup.Done()peer.stopHeartbeat(true)}()}delete(s.peers, name)s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))}// Write the configuration to file.s.writeConf()return nil}
- Snapshot 
根据Raft论文描述,随着系统运行,存储命令的日志文件会一直增长,为了避免这种情况,论文中引入了Snapshot。Snapshot的出发点很简单:淘汰掉那些无用的日志项,那么问题就来了:
- 哪些日志项是无用的,可以丢弃? 
- 如何丢弃无用日志项? 
接下来我们各个击破:
- 如果某个日志项中存储的用户命令(Command)已经被提交到状态机中,那么它就被视为无用的,可以被清理; 
- 因为日志的提交是按照index顺序执行的,因此,只要知道当前副本的提交点(commit index),那么在此之前的所有日志项必然也已经被提交了,因此,这个提交点之前(包括该提交点)的日志都可以被删除。实现上,只要将提交点之后的日志写入新的日志文件,再删除老的日志文件,就大功告成了; 
- 最后需要注意的一点是:在回收日志文件之前,必须要对当前的系统状态机进行保存,否则,状态机数据丢失以后,又删了日志,状态真的就无法恢复了。 
goraft的Snapshot是由应用主动触发的,调用其内部函数TakeSnapshot:
func (s *server) TakeSnapshot() error {......lastIndex, lastTerm := s.log.commitInfo()......path := s.SnapshotPath(lastIndex, lastTerm)s.pendingSnapshot = &Snapshot{lastIndex, lastTerm, nil, nil, path}// 首先应用保存状态机当前状态state, err := s.stateMachine.Save()if err != nil {return err}// 准备Snapshot状态:包括当前日志的index,当前peer等peers := make([]*Peer, 0, len(s.peers)+1)for _, peer := range s.peers {peers = append(peers, peer.clone())}s.pendingSnapshot.Peers = peerss.pendingSnapshot.State = states.saveSnapshot()// 最后,回收日志项:s.log.compact()if lastIndex-s.log.startIndex > NumberOfLogEntriesAfterSnapshot {compactIndex := lastIndex - NumberOfLogEntriesAfterSnapshotcompactTerm := s.log.getEntry(compactIndex).Term()s.log.compact(compactIndex, compactTerm)}return nil}
关于compact()函数就不作仔细描述了,有兴趣的朋友可以自行阅读,非常简单的。
- EOF -
1、
2、
3、
如果觉得本文不错,欢迎转发推荐给更多人。
分享、点赞和在看
支持我们分享更多好文章,谢谢!
