vlambda博客
学习文章列表

ETCD源码分析Client端启动流程分析

ETCD源码基于v3.5,在分析之前,需要搭建好源码分析的环境。首先,从GitHub的仓库中克隆下ETCD的源码,再利用docker搭建我们的ETCD测试集群,命令如下:

REGISTRY=quay.io/coreos/etcd
NAME_1=etcd-node-0
NAME_2=etcd-node-1
NAME_3=etcd-node-2
# IP在不同机器上不同,请查看docker的子网网段
HOST_1=172.20.0.2
HOST_2=172.20.0.3
HOST_3=172.20.0.4
PORT_1=2379
PORT_2=12379
PORT_3=22379
PORT_C_1=2380
PORT_C_2=12380
PORT_C_3=22380
CLUSTER=${NAME_1}=http://${HOST_1}:${PORT_C_1},${NAME_2}=http://${HOST_2}:${PORT_C_2},${NAME_3}=http://${HOST_3}:${PORT_C_3}
# 需要保证目录存在并可写
DATA_DIR=/var/folders/

# 需要创建docker网络,用于模拟集群网络分区的情况。
docker network create etcd_cluster

docker run \
-p $PORT_1:$PORT_1 \
-p $PORT_C_1:$PORT_C_1 \
--volume "${DATA_DIR}${NAME_1}:/etcd-data" \
--name ${NAME_1} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_1} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_1 \
--advertise-client-urls http://$HOST_1:$PORT_1 \
--listen-peer-urls http://0.0.0.0:$PORT_C_1 \
--initial-advertise-peer-urls http://$HOST_1:$PORT_C_1 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr

docker run \
-p $PORT_2:$PORT_2 \
-p $PORT_C_2:$PORT_C_2 \
--volume=${DATA_DIR}${NAME_2}:/etcd-data \
--name ${NAME_2} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_2} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_2 \
--advertise-client-urls http://$HOST_2:$PORT_2 \
--listen-peer-urls http://0.0.0.0:$PORT_C_2 \
--initial-advertise-peer-urls http://$HOST_2:$PORT_C_2 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr

docker run \
-p $PORT_3:$PORT_3 \
-p $PORT_C_3:$PORT_C_3 \
--volume=${DATA_DIR}${NAME_3}:/etcd-data \
--name ${NAME_3} \
--network etcd_cluster \
${REGISTRY}:v3.5.0 \
/usr/local/bin/etcd \
--name ${NAME_3} \
--data-dir /etcd-data \
--listen-client-urls http://0.0.0.0:$PORT_3 \
--advertise-client-urls http://$HOST_3:$PORT_3 \
--listen-peer-urls http://0.0.0.0:$PORT_C_3 \
--initial-advertise-peer-urls http://$HOST_3:$PORT_C_3 \
--initial-cluster ${CLUSTER} \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level info \
--logger zap \
--log-outputs stderr

复制代码

如上,我们创建了三个ETCD节点,组成了一个集群。接下来我们正式进入源码分析流程。

ETCD Client启动流程分析

我们先看一段启动代码样例:

        cli, err := clientv3.New(clientv3.Config{
Endpoints: exampleEndpoints(),
DialTimeout: dialTimeout,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
_, err = cli.Put(ctx, "sample_key", "sample_value")
cancel()
if err != nil {
log.Fatal(err)
}
复制代码

一个最简单的程序只需要提供集群的所有节点的ip和端口就能访问,这里需要注意的是,一定要填写ETCD集群的所有节点,这样才能有故障转移、负载均衡的特性。或者运行一个ETCD的代理节点(ETCD网关)负责转发请求,这样只填写代理节点ip即可,当然性能上会有所损失。

一、ETCD的Client启动流程分析

接下来我们看看Client是如何被创建出来的:


func newClient(cfg *Config) (*Client, error) {
// -----A-----
ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}
// -----A-----

// -----B-----
client.resolver = resolver.New(cfg.Endpoints...)

conn, err := client.dialWithBalancer()
if err != nil {
client.cancel()
client.resolver.Close()
return nil, err
}
client.conn = conn
// -----B-----

// -----C-----
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

...
// -----C-----

return client, nil
}

复制代码
A段代码分析

首先来看第A段代码,其主要是初始化了一个client的实例,并把Config结构体传递给它,那么Config中包含了什么配置项呢?

type Config struct {
// ETCD服务器地址,注意需要提供ETCD集群所有节点的ip
Endpoints []string `json:"endpoints"`

// 设置了此间隔时间,每 AutoSyncInterval 时间ETCD客户端都会
// 自动向ETCD服务端请求最新的ETCD集群的所有节点列表
//
// 默认为0,即不请求
AutoSyncInterval time.Duration `json:"auto-sync-interval"`

// 建立底层的GRPC连接的超时时间
DialTimeout time.Duration `json:"dial-timeout"`

// 这个配置和下面的 DialKeepAliveTimeoutt
// 都是用来打开GRPC提供的 KeepAlive
// 功能,作用主要是保持底层TCP连接的有效性,
// 及时发现连接断开的异常。
//
// 默认不打开 keepalive
DialKeepAliveTime time.Duration `json:"dial-keep-alive-time"`

// 客户端发送 keepalive 的 ping 后,等待服务端的 ping ack 包的时长
// 超过此时长会报 `translation is closed`
DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"`

// 也是 keepalive 中的设置,
// true则表示无论有没有活跃的GRPC连接,都执行ping
// false的话,没有活跃的连接也就不会发送ping。
PermitWithoutStream bool `json:"permit-without-stream"`

// 最大可发送字节数,默认为2MB
// 也就是说,我们ETCD的一条KV记录最大不能超过2MB,
// 如果要设置超过2MB的KV值,
// 只修改这个配置也是无效的,因为ETCD服务端那边的限制也是2MB。
// 需要先修改ETCD服务端启动参数:`--max-request-bytes`,再修改此值。
MaxCallSendMsgSize int

// 最大可接收的字节数,默认为`Int.MaxInt32`
// 一般不需要改动
MaxCallRecvMsgSize int

// HTTPS证书配置
TLS *tls.Config

// 上下文,一般用于取消操作
ctx.Context

// 设置此值,会拒绝连接到低版本的ETCD
// 什么是低版本呢?
// 写死了,小于v3.2的版本都是低版本。
RejectOldCluster bool `json:"reject-old-cluster"`

// GRPC 的连接配置,具体可参考GRPC文档
DialOptions []grpc.DialOption

// zap包的Logger配置
// ETCD用的日志包就是zap
Logger *zap.Logger
LogConfig *zap.Config

...
}
复制代码

还有一些常用配置项,比较简单,这里就不再列出了。

B段代码分析

本段是整个代码的核心部分,主要做了两件事:

  1. 具体的概念就不展开了,如果对grpc这方面比较感兴趣,文末会推荐一个讲的很好的grpc源码分析博客。

    总之,etcd自己写了一个解析器,就在resolver包里,这个解析器提供了以下几个功能:

    1. 告诉grpc,如果Endpoints有多个,负载均衡的策略是轮询,这点很重要。

  2. dialWithBalancer() 建立了到ETCD的服务端链接

func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
creds := c.credentialsForEndpoint(c.Endpoints()[0])
opts := append(dopts, grpc.WithResolvers(c.resolver))
return c.dial(creds, opts...)
}
复制代码

这个用于建立到ETCD服务端的连接的方法名很有意思,虽然叫dialWithBalancer但内部代码很简单,可以看到里面并无Balancer(负载均衡器)的出现。但其实因为上面说到,ETCD使用了自己的resolver,其内部已经写好了负载均衡策略:round_robin。所以这里通过grpc.WithResolvers()把resolver传进去,也是达到了负载均衡的效果。

接下来进入dial(),这个方法虽然有些长,但整体逻辑是非常清晰的,省略无关代码后,其内部是做了以下几件事:

func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
// 首先,ETCD通过这行代码,向GRPC框架加入了一些自己的
// 配置,比如:KeepAlive特性(配置里提到的配置项)、
// TLS证书配置、还有最重要的重试策略。
opts, err := c.dialSetupOpts(creds, dopts...)
...

// context 的一段经典样例代码
// 问:如果我同时把非零的DialTimeout和
// 带超时的 context 传给客户端,
// 到底以哪个超时为准?
// 答:这里新建了子context(dctx),父context和DialTimeout
// 哪个先到deadline,就以哪个为准。
dctx := c.ctx
if c.cfg.DialTimeout > 0 {
var cancel context.CancelFunc
// 同时包含父context和DialTimeout
// 哪个先倒时间就以哪个为准。
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel()
}

// 最终调用grpc.DialContext()建立连接
conn, err := grpc.DialContext(dctx, target, opts...)
...
return conn, nil
}
复制代码
C段代码分析

C段代码无非就是做一些功能接口的初始化,比如:KV接口(用于提供Put、Get等)、Wathcer接口(用于监听Key)等,具体如何初始化到分析各接口再讲。

再回到启动流程,初始化功能完毕后,就是getToken了,这个token是我们开启了ETCD的账号密码功能后,通过账号密码获取到了token,然后才能访问ETCD提供的GRPC接口。

然后是提供 RejectOldCluster 和 autoSync 功能,这个在介绍Config时也提过,这里就不再赘述了。

ETCD Client重试策略分析

对ETCD客户端提供的自动重试策略的分析,是本文的重点。自动重试是ETCD能提供高可用特性的重要保证,在往下分析之前,一定要记住以下两个概念:

  1. 自动重试不会在ETCD集群的同一节点上进行,这跟我们平常做的重试不同,因为前面说了ETCD是通过GRPC框架提供对集群访问的负载均衡策略的,所以会轮询的重试集群的每个节点。

  2. 自动重试只会重试一些特定的错误,比如:codes.Unavailable

接下来,就让我们来看看ETCD是如何利用GRPC提供的拦截器做自动重试的,学会这个,我们也能在自己的GRPC项目中用上同样的套路:

    // 这段代码在dialWithBalancer->dial->dialSetupOpts中
rrBackoff := withBackoff(c.roundRobinQuorumBackoff(defaultBackoffWaitBetween, defaultBackoffJitterFraction))

opts = append(opts,
grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),
grpc.WithUnaryInterceptor(c.unaryClientInterceptor(withMax(defaultUnaryMaxRetries), rrBackoff)),
)

复制代码

看以上的代码,要自动重试只需两步:

  1. 创建backoff函数,也就是计算重试等待时间的函数。

  2. 通过WithXXXInterceptor(),注册重试拦截器,这样每次GRPC有请求都会回调该拦截器。

这里,grpc.WithStreamInterceptor(c.streamClientInterceptor(withMax(0), rrBackoff)),我们看到Stream的重试拦截器,其最大重试次数设置为了0(withMax()),也就是不重试,这其实是故意为之,因为Client端的Stream重试不被支持。(Client端需要重试Stream,需要自己做单独处理,不能通过拦截器。)

那我们首先看看如何计算等待时间:

// waitBetween 重试间隔时长
// jitterFraction 随机抖动率,
// 比如:默认重试间隔为25ms,抖动率:0.1,
// 那么实际重试间隔就在 25土2.5ms 之间。
// attempt 实际重试了多少次
func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration,
jitterFraction float64) backoffFunc {

return func(attempt uint) time.Duration {
n := uint(len(c.Endpoints()))
quorum := (n/2 + 1)
if attempt%quorum == 0 {
c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
return jitterUp(waitBetween, jitterFraction)
}
c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
return 0
}
}
复制代码

可以看到roundRobinQuorumBackoff返回了一个闭包,内部是重试间隔时长计算逻辑,这个逻辑说来也简单:

    1. 若重试次数已经达到集群的法定人数(quorum),则真正的计算间隔时长,
间隔时长到期后,才进行重试。
2. 否则,直接返回0,也就是马上重试。
复制代码

还记得刚才说的必须记住的两个概念吗?其中一点就是负载均衡策略写死是轮询,而这个重试逻辑一定要配合负载均衡是轮询策略,达到的效果是:假如你访问集群中的一台节点失败,可能是那台节点出问题了,但如果整个集群是好的,这时候马上重试,轮询到下台节点就行。

但是,如果重试多次,集群大多数节点(法定人数)都失败了,那应该是集群出问题了,这时候就需要计算间隔时间,等会儿再重试看看问题能不能解决。

这里也可以看到ETCD的Client端,考虑的细节问题是非常多的,一个简单的重试时间计算,也能进行逻辑上的小小优化。

那么重试拦截器又是如何实现的呢?接着看拦截器的相关代码:

func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClientInterceptor {
...
// 如果最大重试次数设置为0,那就不重试。
if callOpts.max == 0 {
return invoker(ctx, method, req, reply, cc, grpcOpts...)
}
var lastErr error
// 开始重试计数
for attempt := uint(0); attempt < callOpts.max; attempt++ {
// 计算重试间隔时间,并阻塞代码,等待
// 这里最终会调用到 roundRobinQuorumBackoff 来计算时间
if err := waitRetryBackoff(ctx, attempt, callOpts); err != nil {
return err
}

// 再次重新执行GRPC请求
lastErr = invoker(ctx, method, req, reply, cc, grpcOpts...)
if lastErr == nil {
// 重试成功,退出
return nil
}

// 这段代码分析了两种情况
// 1. 服务端返回了 Context Error(超时、被取消),直接重试
// 2. 客户端的 Context 也出现了Error
if isContextError(lastErr) {
if ctx.Err() != nil {
// 客户端本身的ctx也报错了,不重试了,退出。
return lastErr
}
// 服务端返回,直接重试
continue
}

if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
// 是AuthToken不正确,重新获取Token
gterr := c.getToken(ctx)
...
continue
}
// 只有在特定错误才重试(code.Unavailable)
// 否则返回Err,不重试。
if !isSafeRetry(c.lg, lastErr, callOpts) {
return lastErr
}
}
return lastErr
}
}
复制代码

代码做了一定程度的精简,但是主要流程都是保留的。

由此,ETCD的整体重试流程也介绍完毕了。

总结

通过对ETCD整个启动流程的代码分析,我们可以总结出以下几点:

1. Endpoints 用来做负载均衡和重试策略计算法定人数,一定要填写集群的全部节点,
或者打开AutoSync功能。

2. ETCD 自己编写了GRPC的resolver和balancer,可以借鉴到GRPC的相关项目中去。
resolver只能解析ip和unix套接字,balancer策略写死是轮询策略。

3. ETCD 重试流程只重试部分错误,所以不要完全指望ETCD的自动重试,一定要自己做好错误处理。

复制代码

启动流程图,其中列出的函数就是整个启动流程上的重要函数:

Config

New

newClient

resolver.New

dialWithBanlancer

dial

grpc.DialContext

最后,本文涉及到一些GRPC的基础知识,不了解的小伙伴可以去(blog.csdn.net/u011582922/… )这里看看,讲的很详细。