vlambda博客
学习文章列表

手撕 hashicorp/raft 算法【万字长文】

本文是第 104 期手撕 hashicrop/raft 算法的文字版,建议搭配着视频(本文文末)一起阅读。
编辑:YouEclipse

概览(00:00~01:11)

欢迎收看今天的分享,我是黄威,是一名趣头条的后端工程师.今天跟大家分享的主要是关于 raft 算法的一个工程实践,就是它用 go 语言是怎样实现的,跟大家分享一下.

首先看一下今天大概的内容,主要是 raft 协议的简介,还有选举过程和日志复制过程的简单介绍,然后是官方的动画的演示,再接着就是源码的讲解,就是hashicorp/raft这个库源码的讲解,然后剩下的就是场景的一些调试,到时候我会修改一些 hashicorp/raft 源码,修改一些日志,到时候方便大家 Debug。最后就是 QA,大概今天是这些内容。

Raft 简介(01:12~03:08)

Raft 选举的过程(03:09~05:56)

先来跟大家,简单介绍一下选举的这个过程吧:

首先是以单个节点的视角,就是说,你的一个节点,应该都是这三个角色(follower,canidate,leader)中的一个,并且是在这三个角色中变换。你的集群(节点)起来时应该是一个 follower 的一个状态,然后经过一个 timeout 时间变成 candidate (候选者),然后变成 candidate 以后,你会向其他节点发送投票的请求,如果说你收到一个 majority 或者 quorum 数量的同意投票的请求,那么你就会提升为 leader,这个 majority 或者 quorum 其实就是你节点集群中,大多数节点的数量,比如说你三个结点就是两个节点,四个的话就是三个,五个的话就是就是也是三个,这样子就是取其中大多数节点,如果说同意你的投票请求,那么你就会提升为 leader,然后 leader 会因为一些情况,比如分区啊,或者说联系不上 follower 等,还有一其他的情况,会变成 follower,这个是大概相当于你的那个一个节点,你可能会在这些状态里面来回的切换。

手撕 hashicorp/raft 算法【万字长文】

然后对于你整个集群的话,你的集群集的状态就是要么在选举,要么就是在选举完成以后正常提供服务的一个状态,蓝颜色的就是在选举,绿色就是选举完成以后,正常提供服务的状态,比如说你集群刚起来肯定在一个 term1, 选举的一个过程;选举出来以后,它就是正常服务的状态,然后到了 term2,就是可能因为某些原因,然后重新选举了,然后又选举出来,然后又重新选举 term3,term3 这里,可能这个任期没有选举出,经历了一个 timeout 的时间,我们可以看上一张图,当前的 candidate 候选者,在自己当前轮没有收到大多数节点的统一的请求,也没有收到其他 leader 对自己的一些心跳的通知,那么它会进入进入下一轮的选举,就是下面这张图的 term4,然后通过选举完成以后,然后又进入一个正常的一个状态,这个大概就是选举的一个过程。

Raft 日志复制的流程(05:56~09:18)

接下来是关于日志复制的一个流程,也跟大家简单介绍一下,首先讲一下日志的格式。

日志格式

日志格式:term + index + cmd + type

日志的格式的话是有任期,索引还有数据,cmd,还有就是 type.这里可以给大家直接看下数据结构:

hashicorp/raft/log.go

// Log entries are replicated to all members of the Raft cluster
// and form the heart of the replicated state machine.
type Log struct {
 // Index holds the index of the log entry.
 Index uint64

 // Term holds the election term of the log entry.
 Term uint64

 // Type holds the type of the log entry.
 Type LogType

 // Data holds the log entry's type-specific data.
 Data []byte

 // Extensions holds an opaque byte slice of information for middleware. It
 // is up to the client of the library to properly modify this as it adds
 // layers and remove those layers when appropriate. This value is a part of
 // the log, so very large values could cause timing issues.
 //
 // N.B. It is _up to the client_ to handle upgrade paths. For instance if
 // using this with go-raftchunking, the client should ensure that all Raft
 // peers are using a version that can handle that extension before ever
 // actually triggering chunking behavior. It is sometimes sufficient to
 // ensure that non-leaders are upgraded first, then the current leader is
 // upgraded, but a leader changeover during this process could lead to
 // trouble, so gating extension behavior via some flag in the client
 // program is also a good idea.
 Extensions []byte
}

你看它的定义的数据结构,其实也是一样,有 index,term,type,data,下面是扩展信息不用管。定义的这些字段上保存 log。

手撕 hashicorp/raft 算法【万字长文】

这张图上面一排是索引,索引的位置第一个位置,第二个位置,然后到一直到 100 多,下面的框里面这个数字,中间的这个数字是代表任期,比如这个第一个任期,第二个任期,第三个任期, X 等于 3,相当于是数据,就是这个论文里面,它其实是以 KV 服务来举例的,那他保存的肯定是一些 kv 的一些命令。这个就是 X 等于 3,y 等于 1, X 等于 2,第 2 个任期提交,以 4 这个位置提交的 x 等于 2,在位置 8 这个位置提交的 X=4...

这大概是一个日志的格式,并且就是说,如果说日志复制到了大多数的节点上,那么他就会提交,所以这里面这张图的话,他 1 到 4 这边 1 到 7 这个位置就是,被提交了的。

请求处理整体流程

手撕 hashicorp/raft 算法【万字长文】

然后就是提交整体的处理流程。比如你一个客户端发送一个发送请求,发送到 leader 的服务器,然后会通过你编写的一个 raft 的模块,跟底层的 raft 的库进行交互,你先把日志存起来,然后在并行的发送给其他的节点。如果你收到大多数的成功的响应,那么你肯定就可以提交这个日志。第三步会写到状态机,然后第四步就返回给客户端,就是大于大致的一个流程.

请求处理详细流程(重点)

下面是一个详细的流程,到时候后面我们也会根据这个图来 Debug

手撕 hashicorp/raft 算法【万字长文】


我会把这里面每一步执行的日志都会打印出来,并且会带着大家分析一下,就是打印日志每一步的上下文,这样的话大家就可以更实际地理解 raft 的实现,会有更直观的一个认识,这个图我后面再反过来跟大家讲一下。

Raft 协议动画演示(09:19~12:40)

选举和日志复制的流程的话,跟大家简单的过了一下。

这里有有两个官方的动画的演示(link1 http://thesecretlivesofdata.com/raft,link2 https://raft.github.io/#implementations),第一个图就不跟大家演示了,就直接看第二个吧。它是模拟 raft 集群的实时的状态,比如说你的集群刚起来,现在都是橘黄色,都是一个 follower 的状态。然后让他继续运行,你看它这个圈就是外层的灰色的线,它是代表一个选举超时,就是 election timeout,所以说如果 follower 没有收到 leader 的请求的话,那么他就会经历一个 timeout 自动的提升为 candidate,然后就会发送投票请求,如果说收到了大多数节点的同意的响应,那么它就会成为 leader。哪个先到 candidate 就会发送投票请求,肯定只会有一个收到大多数的节点的投票的请求,肯定不会有两个的。现在这个 S5 这个服务器变成了 leader,然后他就一直保持心跳,发送心跳的信息。我们先来给他发送一个请求,他会先写到自己的日志,然后我们再让他执行,会被发送到其他的的节点。其他的节点复制完成以后,他就可以返回给客户端请求处理成功。应该是提交信息以后。S5 第二个任期这里,现在就变成实线了,那说明他提交了,接着,它就会发送提交的信息给其它的节点,那其他节点也提交了.

我们再来看一下这个过程: 你自己复制给,其他所有的也复制,然后就自己提交,然后在其他的也提交;如果说我们把它宕掉,会发生什么呢?然后没有心跳,其他收不到,然后就会自动触发选举的流程。然后就会选举出一个新的 leader,现在是 S3,然后我们给他一个请求,也是一样的,但是现在 s2 收不到日志的,日志是落后的,但是没关系,我们把它恢复起来,然后他就会很快就会赶上来。

这个大概就是一个 raft 的选举和复制日志的一个流程动画演示。大家有兴趣啊,到时候也可以自己去看一下。

完整讲解 hashicorp/raft(12:40~28:18)

初始化

我们先从这个 main 函数里面开始吧,先看一下这个服务是怎样启动起来的,然后我们在进入 raft 的源码去看一下。

vision9527/raft-demo/main.go

func main() {
 flag.Parse()
 // 初始化配置
 if httpAddr == "" || raftAddr == "" || raftId == "" || raftCluster == "" {
  fmt.Println("config error")
  os.Exit(1)
  return
 }
 raftDir := "node/raft_" + raftId
 os.MkdirAll(raftDir, 0700)

 // 初始化raft
 myRaft, fm, err := myraft.NewMyRaft(raftAddr, raftId, raftDir)
 if err != nil {
  fmt.Println("NewMyRaft error ", err)
  os.Exit(1)
  return
 }

 // 启动raft
 myraft.Bootstrap(myRaft, raftId, raftAddr, raftCluster)

 // 监听leader变化 (其实这个的话在实际的工程工作实际的开发中,其实应该是不常用的这里面。为了演示简单,然后用了一下)
 go func() {
  for leader := range myRaft.LeaderCh() {
   isLeader = leader
  }
 }()

 // 启动http server
 httpServer := HttpServer{
  ctx: myRaft,
  fsm: fm,
 }

 http.HandleFunc("/set", httpServer.Set)
 http.HandleFunc("/get", httpServer.Get)
 http.ListenAndServe(httpAddr, nil)

 // 关闭raft
 shutdownFuture := myRaft.Shutdown()
 if err := shutdownFuture.Error(); err != nil {
  fmt.Printf("shutdown raft error:%v \n", err)
 }

 // 退出http server
 fmt.Println("shutdown kv http server")
}

初始化一些,配置也是从命令行参数读取,然后第二个就是初始化 raft,里面就是一些 raft 初始化的信息。这个就是启动 raft,然后这个就是进行一点点变化,其实这个的话在实际的工程工作实际的开发中,其实应该是不常用的这里面。为了演示简单,然后用了一下。接着就是其中一个 httpserv,他到这里会阻塞。这里是两个 handler,一个是 set,一个是 get,set 是直接执行命令的一个入口,get 就是简单的一个 get.下面就是关闭了。正常的话应该是到这里阻塞住,如果说我也退出了,他会继续往下.

我们现在主要就是看这两部分(myraft.NewMyRaft 和 myraft.Bootstrap):

vision9527/raft-demo/myraft/my_raft.go

func NewMyRaft(raftAddr, raftId, raftDir string) (*raft.Raft, *fsm.Fsm, error) {
 config := raft.DefaultConfig()
 config.LocalID = raft.ServerID(raftId)
 // config.HeartbeatTimeout = 1000 * time.Millisecond
 // config.ElectionTimeout = 1000 * time.Millisecond
 // config.CommitTimeout = 1000 * time.Millisecond

 addr, err := net.ResolveTCPAddr("tcp", raftAddr)
 if err != nil {
  return nil, nil, err
 }
 transport, err := raft.NewTCPTransport(raftAddr, addr, 2, 5*time.Second, os.Stderr)
 if err != nil {
  return nil, nil, err
 }
 snapshots, err := raft.NewFileSnapshotStore(raftDir, 2, os.Stderr)
 if err != nil {
  return nil, nil, err
 }
 logStore, err := raftboltdb.NewBoltStore(filepath.Join(raftDir, "raft-log.db"))
 if err != nil {
  return nil, nil, err
 }
 stableStore, err := raftboltdb.NewBoltStore(filepath.Join(raftDir, "raft-stable.db"))
 if err != nil {
  return nil, nil, err
 }
 fm := new(fsm.Fsm)
 fm.Data = make(map[string]string)

 rf, err := raft.NewRaft(config, fm, logStore, stableStore, snapshots, transport)
 if err != nil {
  return nil, nil, err
 }

 return rf, fm, nil
}

hashicorp/raft/api.go

// NewRaft is used to construct a new Raft node. It takes a configuration, as well
// as implementations of various interfaces that are required. If we have any
// old state, such as snapshots, logs, peers, etc, all those will be restored
// when creating the Raft node.
func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps SnapshotStore, trans Transport) (*Raft, error) {
 // Validate the configuration.
 if err := ValidateConfig(conf); err != nil {
  return nil, err
 }

 // Ensure we have a LogOutput.
 var logger hclog.Logger
 if conf.Logger != nil {
  logger = conf.Logger
 } else {
  if conf.LogOutput == nil {
   conf.LogOutput = os.Stderr
  }

  logger = hclog.New(&hclog.LoggerOptions{
   Name:   "raft",
   Level:  hclog.LevelFromString(conf.LogLevel),
   Output: conf.LogOutput,
  })
 }

 // Try to restore the current term.
 currentTerm, err := stable.GetUint64(keyCurrentTerm)
 if err != nil && err.Error() != "not found" {
  return nil, fmt.Errorf("failed to load current term: %v", err)
 }

 // Read the index of the last log entry.
 lastIndex, err := logs.LastIndex()
 if err != nil {
  return nil, fmt.Errorf("failed to find last log: %v", err)
 }

 // Get the last log entry.
 var lastLog Log
 if lastIndex > 0 {
  if err = logs.GetLog(lastIndex, &lastLog); err != nil {
   return nil, fmt.Errorf("failed to get last log at index %d: %v", lastIndex, err)
  }
 }

 // Make sure we have a valid server address and ID.
 protocolVersion := conf.ProtocolVersion
 localAddr := ServerAddress(trans.LocalAddr())
 localID := conf.LocalID

 // TODO (slackpad) - When we deprecate protocol version 2, remove this
 // along with the AddPeer() and RemovePeer() APIs.
 if protocolVersion < 3 && string(localID) != string(localAddr) {
  return nil, fmt.Errorf("when running with ProtocolVersion < 3, LocalID must be set to the network address")
 }

 // Create Raft struct.
 r := &Raft{
  protocolVersion:       protocolVersion,
  applyCh:               make(chan *logFuture),
  conf:                  *conf,
  fsm:                   fsm,
  fsmMutateCh:           make(chan interface{}, 128),
  fsmSnapshotCh:         make(chan *reqSnapshotFuture),
  leaderCh:              make(chan bool1),
  localID:               localID,
  localAddr:             localAddr,
  logger:                logger,
  logs:                  logs,
  configurationChangeCh: make(chan *configurationChangeFuture),
  configurations:        configurations{},
  rpcCh:                 trans.Consumer(),
  snapshots:             snaps,
  userSnapshotCh:        make(chan *userSnapshotFuture),
  userRestoreCh:         make(chan *userRestoreFuture),
  shutdownCh:            make(chan struct{}),
  stable:                stable,
  trans:                 trans,
  verifyCh:              make(chan *verifyFuture, 64),
  configurationsCh:      make(chan *configurationsFuture, 8),
  bootstrapCh:           make(chan *bootstrapFuture),
  observers:             make(map[uint64]*Observer),
  leadershipTransferCh:  make(chan *leadershipTransferFuture, 1),
 }

 // Initialize as a follower.
 r.setState(Follower)

 // Restore the current term and the last log.
 r.setCurrentTerm(currentTerm)
 r.setLastLog(lastLog.Index, lastLog.Term)

 // Attempt to restore a snapshot if there are any.
 if err := r.restoreSnapshot(); err != nil {
  return nil, err
 }

 // Scan through the log for any configuration change entries.
 snapshotIndex, _ := r.getLastSnapshot()
 for index := snapshotIndex + 1; index <= lastLog.Index; index++ {
  var entry Log
  if err := r.logs.GetLog(index, &entry); err != nil {
   r.logger.Error("failed to get log""index", index, "error", err)
   panic(err)
  }
  r.processConfigurationLogEntry(&entry)
 }
 r.logger.Info("initial configuration",
  "index", r.configurations.latestIndex,
  "servers", hclog.Fmt("%+v", r.configurations.latest.Servers))

 // Setup a heartbeat fast-path to avoid head-of-line
 // blocking where possible. It MUST be safe for this
 // to be called concurrently with a blocking RPC.
 trans.SetHeartbeatHandler(r.processHeartbeat)

 if conf.skipStartup {
  return r, nil
 }
 // Start the background work.
 r.goFunc(r.run)
 r.goFunc(r.runFSM)
 r.goFunc(r.runSnapshots)
 return r, nil
}

它的启动流程的话就是先校验一下配置,然后 log 的初始化,然后是获取当前的任期,这个任期的话必须得持久化。大家可以看一下这个论文里面这张图。

这张图的话应该说是描述的就是整个 raft 的算法,就是你的数据结构要怎么样设置?然后有那些 RPC 的方法,就是投票和复制日志的 RPC 方法,你应该实现什么样的逻辑啊,在这里都有。这个(state)呢,比如说你要持久化的话,他肯定会持久化当前的任期,还有投票,这里就会取出来。这个(logs.LastIndex())是获取最新日志的索引,接着就是构造 raft 的实例。这个(r.setState(Follower))是设置角色,就是你的集群起来,肯定都是以 follower 的角色起来。这里(setCurrentTerm)设置当前任期,这个(restoreSnapshot)是从快照里面恢复一些数据。这个(processConfigurationLogEntry)是对日志里面一些配置类型的 log 进行处理。然后这里有个 SetHeartbeatHandler的 handler,这里是这样的,正常的话你的 heartbeat 应该是跟 AppendEntries RPC 是同一个 handler,但是一个问题就是说,如果说你的前面一个 AppendEntry 执行很久,然后后面 heartbeat 如果发送请求的话,可能会阻塞,所以就没有达到 heartbeat 或者 ping 的这个效果,就会有问题,所以它是单独初始化了一个 handler,然后,但是呢,他的逻辑是跟这个(AppendEntry)一样的,他们用的都是同一个方法,这个大家可以注意一下.下面就是这,里面就是 raft 算法的核心实现了。

raft 的监听事件

hashicorp/raft/raft.go

// run is a long running goroutine that runs the Raft FSM.
func (r *Raft) run() {
 for {
  // Check if we are doing a shutdown
  select {
  case <-r.shutdownCh:
   // Clear the leader to prevent forwarding
   r.setLeader("")
   return
  default:
  }

  // Enter into a sub-FSM
  switch r.getState() {
  case Follower:
   r.runFollower()
  case Candidate:
   r.runCandidate()
  case Leader:
   r.runLeader()
  }
 }
}

我们接下来进入就是讲 raft 的监听事件,到时候如果大家看这个库的话,肯定也是以这里为入口,这里就开始 raft 的逻辑了,这是 follower 的逻辑和 candidate 的逻辑,还有 leader 的逻辑,其实比较清晰了。

我们先看 runFollower 吧,其中一个都会监听一些 channel,然后执行执行一些逻辑,

// runFollower runs the FSM for a follower.
func (r *Raft) runFollower() {
 didWarn := false
 r.logger.Info("entering follower state""follower", r, "leader", r.Leader())
 metrics.IncrCounter([]string{"raft""state""follower"}, 1)
 heartbeatTimer := randomTimeout(r.conf.HeartbeatTimeout)

 for r.getState() == Follower {
  select {
  case rpc := <-r.rpcCh:
   r.processRPC(rpc)

  case c := <-r.configurationChangeCh:
   // Reject any operations since we are not the leader
   c.respond(ErrNotLeader)

  case a := <-r.applyCh:
   // Reject any operations since we are not the leader
   a.respond(ErrNotLeader)

  case v := <-r.verifyCh:
   // Reject any operations since we are not the leader
   v.respond(ErrNotLeader)

  case r := <-r.userRestoreCh:
   // Reject any restores since we are not the leader
   r.respond(ErrNotLeader)

  case r := <-r.leadershipTransferCh:
   // Reject any operations since we are not the leader
   r.respond(ErrNotLeader)

  case c := <-r.configurationsCh:
   c.configurations = r.configurations.Clone()
   c.respond(nil)

  case b := <-r.bootstrapCh:
   b.respond(r.liveBootstrap(b.configuration))

  case <-heartbeatTimer:
   // Restart the heartbeat timer
   heartbeatTimer = randomTimeout(r.conf.HeartbeatTimeout)

   // Check if we have had a successful contact
   lastContact := r.LastContact()
   if time.Now().Sub(lastContact) < r.conf.HeartbeatTimeout {
    continue
   }

   // Heartbeat failed! Transition to the candidate state
   lastLeader := r.Leader()
   r.setLeader("")

   if r.configurations.latestIndex == 0 {
    if !didWarn {
     r.logger.Warn("no known peers, aborting election")
     didWarn = true
    }
   } else if r.configurations.latestIndex == r.configurations.committedIndex &&
    !hasVote(r.configurations.latest, r.localID) {
    if !didWarn {
     r.logger.Warn("not part of stable configuration, aborting election")
     didWarn = true
    }
   } else {
    r.logger.Warn("heartbeat timeout reached, starting election""last-leader", lastLeader)
    metrics.IncrCounter([]string{"raft""transition""heartbeat_timeout"}, 1)
    r.setState(Candidate)
    return
   }

  case <-r.shutdownCh:
   return
  }
 }
}

对于 follower 来说,其实最重要的两个其实就是 AppendEntries RPC 和投票请求(processRPC)。

还有一个监听的重要 channel,就是这个 heartbeatTimer,就是说说你的集群起来以后应该有一个 timeout 的时间,这个就是一个 channel,经过 timeout,它会发送一个通知。当你 check,如果说没 checkt 没过的话,他会去继续执行到下面,然后执行到这儿(r.setState(Candidate))变成一个 candidate,然后 return 退出这个函数,然后我们再回到这里,你看他就是 return 出来的。下次他进来就是 candidate。

candidate 里面有什么东西呢,其实也很简单,就是投票的一些信息,可以去看一下。

// runCandidate runs the FSM for a candidate.
func (r *Raft) runCandidate() {
 r.logger.Info("entering candidate state""node", r, "term", r.getCurrentTerm()+1)
 metrics.IncrCounter([]string{"raft""state""candidate"}, 1)

 // Start vote for us, and set a timeout
 voteCh := r.electSelf()

 // Make sure the leadership transfer flag is reset after each run. Having this
 // flag will set the field LeadershipTransfer in a RequestVoteRequst to true,
 // which will make other servers vote even though they have a leader already.
 // It is important to reset that flag, because this priviledge could be abused
 // otherwise.
 defer func() { r.candidateFromLeadershipTransfer = false }()

 electionTimer := randomTimeout(r.conf.ElectionTimeout)

 // Tally the votes, need a simple majority
 grantedVotes := 0
 votesNeeded := r.quorumSize()
 r.logger.Debug("votes""needed", votesNeeded)

 for r.getState() == Candidate {
  select {
  case rpc := <-r.rpcCh:
   r.processRPC(rpc)

  case vote := <-voteCh:
   // Check if the term is greater than ours, bail
   if vote.Term > r.getCurrentTerm() {
    r.logger.Debug("newer term discovered, fallback to follower")
    r.setState(Follower)
    r.setCurrentTerm(vote.Term)
    return
   }

   // Check if the vote is granted
   if vote.Granted {
    grantedVotes++
    r.logger.Debug("vote granted""from", vote.voterID, "term", vote.Term, "tally", grantedVotes)
   }

   // Check if we've become the leader
   if grantedVotes >= votesNeeded {
    r.logger.Info("election won""tally", grantedVotes)
    r.setState(Leader)
    r.setLeader(r.localAddr)
    return
   }

  case c := <-r.configurationChangeCh:
   // Reject any operations since we are not the leader
   c.respond(ErrNotLeader)

  case a := <-r.applyCh:
   // Reject any operations since we are not the leader
   a.respond(ErrNotLeader)

  case v := <-r.verifyCh:
   // Reject any operations since we are not the leader
   v.respond(ErrNotLeader)

  case r := <-r.userRestoreCh:
   // Reject any restores since we are not the leader
   r.respond(ErrNotLeader)

  case r := <-r.leadershipTransferCh:
   // Reject any operations since we are not the leader
   r.respond(ErrNotLeader)

  case c := <-r.configurationsCh:
   c.configurations = r.configurations.Clone()
   c.respond(nil)

  case b := <-r.bootstrapCh:
   b.respond(ErrCantBootstrap)

  case <-electionTimer:
   // Election failed! Restart the election. We simply return,
   // which will kick us back into runCandidate
   r.logger.Warn("Election timeout reached, restarting election")
   return

  case <-r.shutdownCh:
   return
  }
 }
}

先给自己投票(electSelf),然后并行地发送给大家,就是遍历一下当时有哪些服务器,除了自己以外,然后都发出一个,然后返回一个 chanel(voteCh),随时监听这个投票请求的结果。它也需要监听 RPC,因为它也是会收到复制日志的请求,投票的请求,会做相应的处理,并且它主要的工作就是监听这个投票的结果。如果说他的他投票结果,如果说有一个同意了,那么他就会+1,直到他大于等于他的法定人数,或者说 majority 人数的数量,它就会变成 leader,之后就会 return,下次进来他就会进入 leader 的角色。还有一个重要的 channel,就是 electionTimeout,直接 return,进入新一轮选举,还是 candidate,进入新一轮的选举,直到选成了 leader 以后。

然后进入 leader 的逻辑,其实 raft 算法里面核心的逻辑肯定都在 leader。

// runLeader runs the FSM for a leader. Do the setup here and drop into
// the leaderLoop for the hot loop.
func (r *Raft) runLeader() {
 r.logger.Info("entering leader state""leader", r)
 metrics.IncrCounter([]string{"raft""state""leader"}, 1)

 // Notify that we are the leader
 overrideNotifyBool(r.leaderCh, true)

 // Push to the notify channel if given
 if notify := r.conf.NotifyCh; notify != nil {
  select {
  case notify <- true:
  case <-r.shutdownCh:
  }
 }

 // setup leader state. This is only supposed to be accessed within the
 // leaderloop.
 r.setupLeaderState()

 // Cleanup state on step down
 defer func() {
  // Since we were the leader previously, we update our
  // last contact time when we step down, so that we are not
  // reporting a last contact time from before we were the
  // leader. Otherwise, to a client it would seem our data
  // is extremely stale.
  r.setLastContact()

  // Stop replication
  for _, p := range r.leaderState.replState {
   close(p.stopCh)
  }

  // Respond to all inflight operations
  for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
   e.Value.(*logFuture).respond(ErrLeadershipLost)
  }

  // Respond to any pending verify requests
  for future := range r.leaderState.notify {
   future.respond(ErrLeadershipLost)
  }

  // Clear all the state
  r.leaderState.commitCh = nil
  r.leaderState.commitment = nil
  r.leaderState.inflight = nil
  r.leaderState.replState = nil
  r.leaderState.notify = nil
  r.leaderState.stepDown = nil

  // If we are stepping down for some reason, no known leader.
  // We may have stepped down due to an RPC call, which would
  // provide the leader, so we cannot always blank this out.
  r.leaderLock.Lock()
  if r.leader == r.localAddr {
   r.leader = ""
  }
  r.leaderLock.Unlock()

  // Notify that we are not the leader
  overrideNotifyBool(r.leaderCh, false)

  // Push to the notify channel if given
  if notify := r.conf.NotifyCh; notify != nil {
   select {
   case notify <- false:
   case <-r.shutdownCh:
    // On shutdown, make a best effort but do not block
    select {
    case notify <- false:
    default:
    }
   }
  }
 }()

 // Start a replication routine for each peer
 r.startStopReplication()

 // Dispatch a no-op log entry first. This gets this leader up to the latest
 // possible commit index, even in the absence of client commands. This used
 // to append a configuration entry instead of a noop. However, that permits
 // an unbounded number of uncommitted configurations in the log. We now
 // maintain that there exists at most one uncommitted configuration entry in
 // any log, so we have to do proper no-ops here.
 noop := &logFuture{
  log: Log{
   Type: LogNoop,
  },
 }
 r.dispatchLogs([]*logFuture{noop})

 // Sit in the leader loop until we step down
 r.leaderLoop()
}

这个(setupLeaderState)是初始化一些 leader 的配置,然后接着就是 leader 起来起来以后,(startStopReplication)会给其他其他的每一个节点,启动一个复制的线程,就是说如果有日志复制的消息的话,会发给这些线程,会去执行发送的逻辑。这里面就是启动一个 goroutine,就是异步地启动,然后 replicate 就是整个复制的逻辑就,到时候我们也会详细的讲一下这个方法.

这个(noop)可以单独跟大家说一下,就是这个,每一个 leader 刚开始选举成功以后都会发生一个 noop 的日志,具体大家可以看一下,这个就是日志其实有很多类型,就是用应用程序需要用到一些命令,这个是一个可以操作的日志,他有什么用呢?(下面还有一些其他的日志的类型)它的作用就是其实主要是提交前面任期的 leader 没有提交的日志,这个在论文里面有有提到过。

接下来就是进入 leader 的,它的监听的一些事件,它就监听的稍微多一点, 首先他肯定也是可以监听 RPC 的(processRPC),可以接受投票,复制日志的,但是他只能接受比自己任期大的,接受以后他并不会真正的复制,他会退回到 follower 的状态.后面就是一个重要的是 commitCh,如果说有些提交的消息,会被发送到这个 channel 里,他会监听到,执行 commit 的逻辑,后面就是一个比较重要的是,就是 applyCh,客户端发送的请求会被打包成日志放到这里面来。然后他会做一个 dispatch,就是分发给其他的,刚才我们不是启动了很多复制的线程嘛,这里的话就是分发给那些其他节点。还有一个就是 lease,他会定期的检查,如果说还是没有得到大多数节点的投票,他就会退回到 follower,逻辑都比较清晰,你是哪个角色都会有相应的逻辑进行执行。

源码讲解的话,后面会根据调试的场景会再次讲解,大致启动的流程和逻辑就是这些。比如说你 New 一个 Raft 以后,刚才的逻辑已经加载进来了,但是因为他们互相之间还连不上,所以会进入这其中一段 Bootstrap,刚才我们把这个集群的信息都加载进来,它就会生成一条配置信息,然后用这个信息来启动集群,启动完成以后。然后就自动地选举出 leader,然后对外提供服务。

调试日志(28:30~58:40)

这个是 raft 的源码的讲解,那么接下来就是第二个重点,就是我们今天调试的一些场景,需要跟大家手把手地调试日志. 先编译,编译完以后,再启动,启动的话先看一下这个命令行,

./raft-demo --http_addr=127.0.0.1:7001 --raft_addr=127.0.0.1:7000 --raft_id=1 --raft_cluster=1/127.0.0.1:7000,2/127.0.0.1:8000,3/127.0.0.1:9000


调试场景(28:22~)

选举变化相关(28:22~38:38)

这里的话,我把每个场景都有一个分支都单独的切出来,到时候大家可以看一下.

先看启动场景:

集群启动后,follower 等待一个随机 election timeout 时间变成 candidate,然后发起投票,如果不能获得 majority 票数,则任期 term 会一直增加(未 pre-vote 情况)(branch: election-1)

启动以后,他会发起投票,经历了一个那个 election_timeout 的时间,然后就会到 candidate 的状态,比如说在源码里面的话,他刚开始起来是一个 follower,监听这些 channel,然后都其他 chanel 都没有消息,然后只有这个 heartbeatTimeout,会发送一个消息,他会进行 check,如果没有 leader 对他进行 heartbeat,他就会变成 candidate,然后就会触发新一轮的选举,进入 candidate 的状态,他需要两个投票才行,现在总共才只有一个。因为他根本就联系不上其他两个节点 2 和节点 3,然后就一直这样,26,27(任期),当然之前我也运行过,正常的话它是从 123456, 任期一直增加.当然这是在没有 B 投票的情况,投票其实是对这个产品的一个优化,就是说避免就这个任期一直增加,大概实现的思路就是你要联系上其他的节点,而且你要联系上,然后是能够 ping 到,然后你才会进行投票,大概是这样的一个逻辑啊,大家可以,自己去了解一下。

第二个场景,第二个场景是获得的 majority 的投票.

集群启动后,follower 等待一个随机 election timeout 时间变成 candidate,然后发起投票,获得 majority 票数的节点变成 leader (branch: election-2)

我们运行一下,你看他现在还是联系不上,我们现在再启动一个(节点),那集群里就有两个了,我们看发送投票请求,只有 3 失败了,之前 2 和 3 都失败了,就说明 2 是成功了的,正好这里的日志也印证了,比如说他现在是需要两个,然后他收到了两个,它就会进入 leader 状态,就是说他收到了大多数节点的投票,就会变成 leader。那我们看一下相关的这个逻辑。它是 follower,然后 timeout,然后就会变成 candidate,然后他会发送投票的请求,在这里(runCandidate)就会一直比较,如果说大于等于法定人数的话,那么它就会变成 leader,然后就退出,退出以后,就会执行 leader 的逻辑。

我们看第三个场景。

leader 选举成功后发送 heartbeat 保持 leader 的地位(branch: election-3)

我们把原来 raft 的数据清理一下,启动一个节点,两个节点,我马上给您起来吧。现在 leader 就是上面这个节点,选举成功以后,它会不停的给其他两个节点发送心跳的请求,leader 会给 2,3 发送心跳的信息,保持 leader 地位,其他的节点就不会触发选举。

第四个场景

leader 失去 majority 节点的 heartbeat 响应,退回到 follower(branch: election-4)

这是正常的 heartbeat 的,然后我们两个都停掉,最开始也是很正常的收到请求,后面就是一长串收不到了。失败了嘛,就降级成 follower 的状态,然后又没有其他的 leader 给他发送心跳的信息,然后就会进入 candidate 的状态。然后又发送投票信息,当然现在肯定都是失败的,然后就任期就一直增加,不停的发送投票,我看一下他是从降级的,我们直接看日志吧,到时候大家也可以像我一样看把日志打印出来,这里这样子的,他如果说比如说他联系上的是少于 quorum 的话就会变成 folowwer,我们看一下这个是哪里调用 checkleaderlease 函数的,就是刚才就是 leaderLoop 里面的 lease Channel,它会定时的去检测,如果联系上就没关系,如果联系不上的话就会降级。

这个是四个关于选举的场景,当然可能没有列举完啊,但是我觉得主要的场景其实也就是这些,大家如果根据这些打印的日志的上下文去搜一下打印日志的内容,会有更深的一些理解。

日志复制(38:38~)

后面下面就进入到我们,我们的最重要部分的日志复制。那现在就是回过回过来讲一下这个图,看一下我们详细的处理客户端请求的逻辑是什么样的

,首先看到这个图,第一步是客户端发送一个请求,发送一个命令给 leader,第二步他的服务层收到以后处理成相关的需要保存的日志数据,然后通过一致性模块,就是 raft replication 一致性模块,第三步就是并行的发送给其他的 follower,有大多数的节点,这里把它置灰了,比如说他宕机掉了,或者说他比较慢,都可以不用管它,只要有一个节点复制成功了,那么他就可以申请第五步提交,提交以后然后会执行到状态机,因为我们执行的状态机,然后都会执行完以后,然后就返回给客户端成功,然后接着就是异步地把提交的信息发送给 follower, 然后 folower 把这个命令到到状态机里面去执行。然后因为你的状态以及初始状态是一致的,然后执行作业命令也是一致的,那么到这一步呢,所以这一刻的状态机的状态肯定都是一致的,都是 358,对吧。那么你集群里面所有节点的数据都就就都保持一致。这里要注意的是就是 7,这个正常来说应该是在 5 之后,就可以把这个信息发送给其他的节点。但是由于就是我们,commit 的策略吧,它是固定一定的时间间隔,把这个提交的信息发送给其他的其他节点,所以从日志上看起来的话,会是像是在返回给客户端以后,到时候我会打印,然后大家也会看到是这样。

(第一个场景)

leader 接收客户端请求,向集群内所有节点发送复制 RPC,所有都正常响应 -> 正常 commit,然后 apply 到状态机,最后返回客户端处理成功(branch: replicate-log-1)

我先模拟客户端,发送一个请求,然后在相应的每一步都打印出来,然后 follower 收到这样的信息,这里的话,我修改了一下,我把 commitTimeout 变成了五秒,正常的话他可能没这么长,他好像原本是 1 秒.

  • 第一步发送请求。
  • 第二步就是进行到自己的 log 区域里了
  • 第三步是两个并行的送给 follower,
  • 第四步就是返回,只要有一个返回,他马上就可以提交了
  • 第五步是提交 leader 的日志
  • 第六步是将日志实时同步到状态机。
  • 第七步返回给客户端成功
  • 第八步然后就是发送给客户端,提交信息,并且客户端把它更新到状态机里

接下来我跟大家可以跟踪一下这些日志打印的位置,然后大家可以看一下上下文他是如何实现的。

...

(跟着日志讲解日志复制的代码,跳转较多,文字无法充分表述,建议直接参考视频)

...

我们看第二个场景

leader 接收客户端请求,向集群内所有节点发送复制 RPC,majority 正常响应 -> 正常 commit,然后 apply 到状态机,最后返回客户端处理成功(branch: replicate-log-2)

第二个场景就是说,比如说我们三个节点,如果前两个宕掉它能够正常的对外提供服务需求吗?OK 的,你看其实也是 OK 的吧,虽然说咱这个节点宕掉了,但是其实也是 OK 的。

leader 接收客户端请求,向集群内所有节点发送复制 RPC,少于 majority 正常响应 -> 不能 commit(branch: replicate-log-3)

(第三个场景)如果把这两个都宕掉了,然后它只剩 leader,基本上就没办法处理请求了。肯定会失败。所以说节点数小于一半节点数,他就没法工作了。

总结

今天的介绍大概的内容就是这些,然后,我相信这个是分享,主要是跟大家分享一下 raft 的源码啊,还有就是原版里面具体的选举和日志复制的一个实现,大家可以根据这个打印的日志梳理一下上下文,然后又可以更清楚的了解他的一个具体的实现。好的,那这个分享大概就是这些.

QA

  • 1.这个库和 ETCD 的库有什么区别?

就说一下共同点吧,就是都是 go 写的,ETCD 那个库的话,他其实只实现了 raft 核心的部分,就是那个最核心的部分的存储和网络传输,那些都交给了,就是用户自己去实现,所以你需要去实现,很多其他的那个工作量,还有一些其他的开发量,比如说保存,存储的发送,都是需要你去实现这些逻辑。但是,还是要客观,那就相当于是全套的. 一整套的你都可以用,也比较简单,可以开箱即用,并且这个 ETCD 这个 raft 库写的更抽象一点,就是把核心算法做出成了一个状态机。他的状态不断的在 ready 里面去暴露,你需要去存储,去发送信息,这个其实是最最主要的一点,就是说,如果说你需要一些高性能的,或者说你需要一些,很多优化的,我觉得可能 ETCD 库会更好一点,但是我觉得如果说你要了解 raft 的算法,去了解 raft 协议的内容,这个库更好一点。就是说工程上的话,我觉得可能那个 etcd raft 会更好点的就是实际应用。

  • 2.怎么做测试?

这个我可能没有更多的经验,我觉得可能还是。就是多预想一些 case 吧,就是比如说,就是你可能自己去构造一个 case 的场景,对写对应的测试用例就可以。

  • 3.三个节点,宕机两个,剩下一个不可用,怎么处理请求的强一致?

这个时候服务应该是不可用的,当然如果要强行提供查询的服务,强一致肯定是无法保证的。

  • 4.客户单独请求不经过 raft 模块吗?

当然这个的话,大家可以去看一下,consul, 他其实提供了很多的一致性的模型呢,就是你可能不需要强一致性,也需要强一致性,他有不同的模型,如果不需要强一致性,那你可以不需要经过任何的模块,如果说你需要强一致性,那最简单的就是通过 raft 的模块,就是你的日志复制到了大多数节点上,那么你才给客户端返回,这个是最简单的,其他的可能还有一些更优雅或者更好的设计,这个大家可以去了解下

  • 5.这个库状态机实现不需要暴露读接口吗?

对,其实这两个我觉得这三个吧,其实都是关于一致性的问题,其实都都算是一个问题吧, 其实那个 apply 那其实你也可以读,也可以写啊,都可以。

  • 6.节点之间的长连接怎么建立并通信,源码在哪儿?

在这个文件(raft-demo/net_transport.go)里面,他是单独的一个模块写的,都在这里面,他怎么样做去做 RPC 的分发,接收响应都在里面,大家可以,你们可以去看一下。

  • 7.第七步未成功,leader 会给 client 正常返回吗?

那肯定肯定是不会啊,不会返回的呀,你没处理成功,肯定不会。但是对 leader,其实他收不到吗?收不到就是 timeout 了,正常的话,你他客户端 timeout 了,你肯定会去执行一下你客户端的逻辑,你比如说正常的请求,就是你客户端 timeout 了,那我肯定不会,不可能认为他是执行完成或者没完成,那可能会有一些一次查询什么的,所以你的客户端会有一定的开发工作量,大概他在这这里(论文)有一些简单的简单的介绍啊,你可以看一下,客户端开发会有一些,比如说怎么样防止 重复请求,涉及到客户端一些的开发的工作量,那是肯定是不可能,肯定就不会返回成功。

超时的话,leader 肯定是反馈给一个错误,就是请求请求失败给客户端。但是他会知道是,比如说看你看这个,不是有个 timeout 的实现吗?那你肯定知道是 timeout 的失败的,那么其实对于我客户端的视角来说, timeout 其实我并不知道这个请求是处理成功还是处理,(失败了对吧?) 我不能默认他成功,也不能默认他失败,那我对于我客户端,我是不是得去查询一下之类的一些动作。失败了,客户端会反馈给他失败的,实际有没有执行的话,可能你需要去,通过业务场景去查询一下,我认为是这样子的。

  • 8.你在 debug 的这个库的这个 raft 的算法你大概用了多长时间?这样也可以帮助,现在可能还没有去学,但是可能想要去学,他可以预估一下这个大概的时间?

就是首先要去看这个源码,要去做这个的 debug 的话,肯定大家要对 raft 协议有一个那个最基本的认识,就是至少这个论文我不管中文还是英文,应该有有读,过了时间的话可能跟每个人不同,以我自己的话,论文我大概前前后后大概看了有两三个月吧,也不是一直在看,就偶尔有空看一下就是,反正我是对照中英文,然后以英文为主,然后再来看。看完以后,然后就当时确实也比较懵逼吧,然后就想着是要看一下源码实现才会更深刻地理解,就找到了这个库,然后不是直接就看库,然后就看会搜一些,简单的,就像我的特约服务一样,其实别人也有写的,就像我刚才那种那种跟踪跟踪代码的方法,然后去一步一步的那个找到他的关键的逻辑,大概的话,看源码的话,其实如果你了解是了解的那个协议的话,就稍微比较快一点,我觉得可能就是可能几周吧,或者两三周我觉得差不多你就可以,你就可以看明白了。


视频回看


点击阅读原文,了解更多~