vlambda博客
学习文章列表

「gRPC系列」gRPC 服务发现组件 resolver

大家好,我是阿星,今天继续「gRPC系列:全面剖析gRPC 的设计原则与工作原理」的分享,这是第二篇:「gRPC  服务调用:服务发现 resolver 组件」。


前文」,已经了解了 gRPC 远程过程调用的基本原理以及学会使用 gRPC 进行远程服务调用了。


今天我们来了解一下 gRPC 是如何工作的,清楚的理解其调用逻辑,对于我们更好、更深入的使用gRPC很有必要。因此我们必须深度解析下gRPC的实现逻辑,在本文中,将从 gRPC  服务调用的第一个组件:「服务发现组件 

- resolver 」开始学习。

序0:gRPC 远程调用

先看一段简单的客户端 gRPC 调用代码:


// 连接服务器 conn, err := grpc.Dial(":10010", grpc.WithInsecure()) if err != nil { fmt.Printf("faild to connect: %v", err) } defer conn.Close()
c := hello.NewGreeterClient(conn) // 调用服务端的SayHello r, err := c.SayHello(context.Background(), &hello.HelloRequest{Name: "astar"}) if err != nil { fmt.Printf("could not greet: %v", err) } fmt.Printf("Greeting: %s !\n", r.Message)


整体来说,调用的过程基本就是分为三步:

    1. 创建 gRPC 连接

    2. 创建业务客户端实例

    3. 调用 RPC 接口


下面,我们先来看看第一个步骤「建立 gRPC 连接」的整体调用逻辑的时序图(可点击放大),有兴趣的话可以根据这张图去阅读一下 gRPC 的源码。



不过,上述流程图可能有些复杂,但是没关系,上述流程主要是让大家看一下 gRPC 建立连接的全貌,以便于能够更好地理解。后续系列会逐一分析每个步骤都是在干什么的以及怎么做的。目前只需要了解到这个流程的主要作用是获取一个 ClientConn 对象就完了。那么这个 ClientConn 对象又是什么呢?看一下 ClientConn 的结构:

type ClientConn struct { ctx context.Context cancel context.CancelFunc  target       string  parsedTarget resolver.Target  balancerBuildOpts balancer.BuildOptions  resolverWrapper *ccResolverWrapper conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string  balancerWrapper *ccBalancerWrapper  ......}


我省略了一些参数,只保留了一些需要关注的重要属性。其中整个 RPC 过程中比较重要的组件主要就是 resolver 和 balancer ,也就是上述结构中的 resolverWrapper 和 balancerWrapper 属性。我根据上述流程图,画了一个更清晰的 gRPC 内部关键组件交互图:


「gRPC系列」gRPC 服务发现组件 resolver



下面就先来了解一下 gRPC 服务发现组件: resolover 的工作原理。


序1:gRPC 服务发现核心组件 resolver 介绍

gRPC 的 resolver 组件主要由如下几个结构构成:


「gRPC系列」gRPC 服务发现组件 resolver



他们之间的工作流程如下:



  1. gRPC client 中维护着一个存放 resolver build 的map全局变量,可以支持多种 resolver build 组件的注册,依赖对应的 Scheme 来解析对应的 resolver build

  2. 基于对应的 resolver builder  并依赖对应的的 target 参数,Build 出对应的 resolver 

  3. 依赖实现了 ClientConn 的 resolverWrapper 来具体实现后续对应的连接管理的操作


序2:resolver 工作原理与源码分析

gRPC 获取 resolver 的入口为 parseTargetAndFindResolver 函数,下面就从源码层面上来分析一下其工作原理。


先看一下 parseTarget 这个函数,后面 parseTargetAndFindResolver里会用到:


func parseTarget(target string) (resolver.Target, error) {//解析target,target的val为":10010" u, err := url.Parse(target) if err != nil {//解析地址出错,生成默认的空的resolver.Target return resolver.Target{}, err }  endpoint := u.Path if endpoint == "" { endpoint = u.Opaque }  //没有解析出错,就会构建对应的 target 相关参数 endpoint = strings.TrimPrefix(endpoint, "/") return resolver.Target{ Scheme: u.Scheme, Authority: u.Host, Endpoint: endpoint, URL: *u, }, nil}  


再回到 parseTargetAndFindResolver函数:


func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) { channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
var rb resolver.Builder  //1. 这一步因为target的scheme为空,所以解析会出错,  // 就会直接跳过else的代码 parsedTarget, err := parseTarget(cc.target) if err != nil { channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err) } else { channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget) rb = cc.getResolver(parsedTarget.Scheme) if rb != nil { cc.parsedTarget = parsedTarget return rb, nil }  }//2. 没有获取到resolver则使用默认的scheme,即passthought defScheme := resolver.GetDefaultScheme() channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme) canonicalTarget := defScheme + ":///" + cc.target  //3. 再次解析target,这次会解析成功,并且GetDefaultScheme时已经  // 隐式注册了passthought 的 resolver builder parsedTarget, err = parseTarget(canonicalTarget) if err != nil { channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err) return nil, err } channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)  //4. 获取到 passthought resolver builder rb = cc.getResolver(parsedTarget.Scheme) if rb == nil { return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme) } cc.parsedTarget = parsedTarget return rb, nil}


至此,解析获取 resolver builder 的过程就完结了,其实并不复杂,我们总结一下,看看 resolver 是怎么解析的。


一个target的构成单元为:scheme + ://author/+endpoint


通过不同的 scheme 解析并构建出对应的 target 对象。下面再来看看 gRPC 默认的 passthrough resolver 的工作原理:

package passthroughimport "google.golang.org/grpc/resolver"const scheme = "passthrough"type passthroughBuilder struct{}func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {//1. 直接将地址放到resolver中即可,// 如果是dns解析器会通过该函数解析到实际的地址,并后续进行连接 r := &passthroughResolver{ target: target, cc: cc, }//直接基于上述地址触发balancer的构建,基于balancer来管理连接 r.start() return r, nil}

func (*passthroughBuilder) Scheme() string { return scheme}

type passthroughResolver struct { target resolver.Target cc resolver.ClientConn}

func (r *passthroughResolver) start() {  // 依赖cc来管理连接,cc底层会触发balancer的构建 r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})}

func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}

func (*passthroughResolver) Close() {}

func init() { resolver.Register(&passthroughBuilder{})}



序3:基于etcd实现的自定义grpc resolver组件

gRPC 内置的服务发现组件过于单调了,下面,我们基于实现一套更为负载的grpc组件。


基于 etcd 实现一套基于 etcd 的自定义 resolver 组件:


package resolver
import ( "context" "fmt" "go.etcd.io/etcd/api/v3/mvccpb" "google.golang.org/grpc/resolver" "log" "strings" "time"  clientv3 "go.etcd.io/etcd/client/v3")
const scheme = "etcd"
var cli *clientv3.Client

// etcdResolver etcd解析器type etcdResolver struct { etcdAddr string clientConn resolver.ClientConn}

//初始化一个etcd解析器func NewResolver(etcdAddr string) resolver.Builder { return &etcdResolver{etcdAddr: etcdAddr}}
func (r *etcdResolver) Scheme() string { return schema}
//watch有变化以后会调用func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) { log.Println("ResolveNow") fmt.Println(rn)}
//解析器关闭时调用func (r *etcdResolver) Close() { log.Println("Close")}
//构建解析器 grpc.Dial()同步调用func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { var err error
//构建etcd client if cli == nil { cli, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(r.etcdAddr, ";"), DialTimeout: 15 * time.Second, }) if err != nil { fmt.Printf("连接etcd失败:%s\n", err) return nil, err } }
r.clientConn = clientConn  //监听服务地址变化 go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil}
//监听etcd中某个key前缀的服务地址列表的变化func (r *etcdResolver) watch(keyPrefix string) { //初始化服务地址列表 var addrList []resolver.Address
resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("获取服务地址列表失败:", err) } else { for i := range resp.Kvs { addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)}) } }
r.clientConn.UpdateState(resolver.State{Addresses:addrList})
//监听服务地址列表的变化 rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for n := range rch { for _, ev := range n.Events { addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exists(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) r.clientConn.UpdateState(resolver.State{Addresses:addrList}) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s r.clientConn.UpdateState(resolver.State{Addresses:addrList}) } } } }}
func exists(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false}


再来看看如何使用:


  //注册etcd resolver myResolver := myresolver.NewResolver("myetcdhostip:12379") resolver.Register(myResolver) // 连接服务器 conn, err := grpc.Dial(myResolver.Scheme()+"://author/"+"GreetService", grpc.WithInsecure()) if err != nil { fmt.Printf("faild to connect: %v", err) } defer conn.Close()


序4:总结


总结一下,在 gRPC 中,resolver 主要实现了如下功能:

  • 定义 target

  • 实现 resolver.Builder

  • 实现 resolver.Resolver

  • 调用 resolver.Register 注册自定义的 Resolver,其中 name 为 target 中的scheme

  • 实现服务发现逻辑