「gRPC系列」gRPC 服务发现组件 resolver
大家好,我是阿星,今天继续「gRPC系列:全面剖析gRPC 的设计原则与工作原理」的分享,这是第二篇:「gRPC 服务调用:服务发现 resolver 组件」。
前文」,已经了解了 gRPC 远程过程调用的基本原理以及学会使用 gRPC 进行远程服务调用了。
今天我们来了解一下 gRPC 是如何工作的,清楚的理解其调用逻辑,对于我们更好、更深入的使用gRPC很有必要。因此我们必须深度解析下gRPC的实现逻辑,在本文中,将从 gRPC 服务调用的第一个组件:「服务发现组件
- resolver 」开始学习。
序0:gRPC 远程调用
先看一段简单的客户端 gRPC 调用代码:
// 连接服务器conn, err := grpc.Dial(":10010", grpc.WithInsecure())if err != nil {fmt.Printf("faild to connect: %v", err)}defer conn.Close()c := hello.NewGreeterClient(conn)// 调用服务端的SayHellor, err := c.SayHello(context.Background(), &hello.HelloRequest{Name: "astar"})if err != nil {fmt.Printf("could not greet: %v", err)}fmt.Printf("Greeting: %s !\n", r.Message)
整体来说,调用的过程基本就是分为三步:
1. 创建 gRPC 连接
2. 创建业务客户端实例
3. 调用 RPC 接口
下面,我们先来看看第一个步骤「建立 gRPC 连接」的整体调用逻辑的时序图(可点击放大),有兴趣的话可以根据这张图去阅读一下 gRPC 的源码。
不过,上述流程图可能有些复杂,但是没关系,上述流程主要是让大家看一下 gRPC 建立连接的全貌,以便于能够更好地理解。后续系列会逐一分析每个步骤都是在干什么的以及怎么做的。目前只需要了解到这个流程的主要作用是获取一个 ClientConn 对象就完了。那么这个 ClientConn 对象又是什么呢?看一下 ClientConn 的结构:
type ClientConn struct {ctx context.Contextcancel context.CancelFunctarget stringparsedTarget resolver.TargetbalancerBuildOpts balancer.BuildOptionsresolverWrapper *ccResolverWrapperconns map[*addrConn]struct{}Keepalive parameter can be updated if a GoAway is received.mkp keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapper......}
我省略了一些参数,只保留了一些需要关注的重要属性。其中整个 RPC 过程中比较重要的组件主要就是 resolver 和 balancer ,也就是上述结构中的 resolverWrapper 和 balancerWrapper 属性。我根据上述流程图,画了一个更清晰的 gRPC 内部关键组件交互图:
下面就先来了解一下 gRPC 服务发现组件: resolover 的工作原理。
序1:gRPC 服务发现核心组件 resolver 介绍
gRPC 的 resolver 组件主要由如下几个结构构成:
他们之间的工作流程如下:
gRPC client 中维护着一个存放 resolver build 的map全局变量,可以支持多种 resolver build 组件的注册,依赖对应的 Scheme 来解析对应的 resolver build
基于对应的 resolver builder 并依赖对应的的 target 参数,Build 出对应的 resolver
依赖实现了 ClientConn 的 resolverWrapper 来具体实现后续对应的连接管理的操作
序2:resolver 工作原理与源码分析
gRPC 获取 resolver 的入口为 parseTargetAndFindResolver 函数,下面就从源码层面上来分析一下其工作原理。
先看一下 parseTarget 这个函数,后面 parseTargetAndFindResolver里会用到:
func parseTarget(target string) (resolver.Target, error) {//解析target,target的val为":10010"u, err := url.Parse(target)if err != nil {//解析地址出错,生成默认的空的resolver.Targetreturn resolver.Target{}, err}endpoint := u.Pathif endpoint == "" {endpoint = u.Opaque}//没有解析出错,就会构建对应的 target 相关参数endpoint = strings.TrimPrefix(endpoint, "/")return resolver.Target{Scheme: u.Scheme,Authority: u.Host,Endpoint: endpoint,URL: *u,}, nil}
再回到 parseTargetAndFindResolver函数:
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)var rb resolver.Builder//1. 这一步因为target的scheme为空,所以解析会出错,// 就会直接跳过else的代码parsedTarget, err := parseTarget(cc.target)if err != nil {channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)} else {channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)rb = cc.getResolver(parsedTarget.Scheme)if rb != nil {cc.parsedTarget = parsedTargetreturn rb, nil}}//2. 没有获取到resolver则使用默认的scheme,即passthoughtdefScheme := resolver.GetDefaultScheme()channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)canonicalTarget := defScheme + ":///" + cc.target//3. 再次解析target,这次会解析成功,并且GetDefaultScheme时已经// 隐式注册了passthought 的 resolver builderparsedTarget, err = parseTarget(canonicalTarget)if err != nil {channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)return nil, err}channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)//4. 获取到 passthought resolver builderrb = cc.getResolver(parsedTarget.Scheme)if rb == nil {return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)}cc.parsedTarget = parsedTargetreturn rb, nil}
至此,解析获取 resolver builder 的过程就完结了,其实并不复杂,我们总结一下,看看 resolver 是怎么解析的。
一个target的构成单元为:scheme + ://author/+endpoint
通过不同的 scheme 解析并构建出对应的 target 对象。下面再来看看 gRPC 默认的 passthrough resolver 的工作原理:
package passthroughimport "google.golang.org/grpc/resolver"const scheme = "passthrough"type passthroughBuilder struct{}func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {//1. 直接将地址放到resolver中即可,// 如果是dns解析器会通过该函数解析到实际的地址,并后续进行连接r := &passthroughResolver{target: target,cc: cc,}//直接基于上述地址触发balancer的构建,基于balancer来管理连接r.start()return r, nil}func (*passthroughBuilder) Scheme() string {return scheme}type passthroughResolver struct {target resolver.Targetcc resolver.ClientConn}func (r *passthroughResolver) start() {// 依赖cc来管理连接,cc底层会触发balancer的构建r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})}func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}func (*passthroughResolver) Close() {}func init() {resolver.Register(&passthroughBuilder{})}
序3:基于etcd实现的自定义grpc resolver组件
gRPC 内置的服务发现组件过于单调了,下面,我们基于实现一套更为负载的grpc组件。
基于 etcd 实现一套基于 etcd 的自定义 resolver 组件:
packageresolverimport ("context""fmt""go.etcd.io/etcd/api/v3/mvccpb""google.golang.org/grpc/resolver""log""strings""time"clientv3 "go.etcd.io/etcd/client/v3")const scheme = "etcd"var cli *clientv3.Client// etcdResolver etcd解析器type etcdResolver struct {etcdAddr stringclientConn resolver.ClientConn}//初始化一个etcd解析器func NewResolver(etcdAddr string) resolver.Builder {return &etcdResolver{etcdAddr: etcdAddr}}func (r *etcdResolver) Scheme() string {return schema}//watch有变化以后会调用func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {log.Println("ResolveNow")fmt.Println(rn)}//解析器关闭时调用func (r *etcdResolver) Close() {log.Println("Close")}//构建解析器 grpc.Dial()同步调用func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {var err error//构建etcd clientif cli == nil {cli, err = clientv3.New(clientv3.Config{Endpoints: strings.Split(r.etcdAddr, ";"),DialTimeout: 15 * time.Second,})if err != nil {fmt.Printf("连接etcd失败:%s\n", err)return nil, err}}r.clientConn = clientConn//监听服务地址变化go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")return r, nil}//监听etcd中某个key前缀的服务地址列表的变化func (r *etcdResolver) watch(keyPrefix string) {//初始化服务地址列表var addrList []resolver.Addressresp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())if err != nil {fmt.Println("获取服务地址列表失败:", err)} else {for i := range resp.Kvs {addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})}}r.clientConn.UpdateState(resolver.State{Addresses:addrList})//监听服务地址列表的变化rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())for n := range rch {for _, ev := range n.Events {addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)switch ev.Type {case mvccpb.PUT:if !exists(addrList, addr) {addrList = append(addrList, resolver.Address{Addr: addr})r.clientConn.UpdateState(resolver.State{Addresses:addrList})}case mvccpb.DELETE:if s, ok := remove(addrList, addr); ok {addrList = sr.clientConn.UpdateState(resolver.State{Addresses:addrList})}}}}}func exists(l []resolver.Address, addr string) bool {for i := range l {if l[i].Addr == addr {return true}}return false}func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {for i := range s {if s[i].Addr == addr {s[i] = s[len(s)-1]return s[:len(s)-1], true}}return nil, false}
再来看看如何使用:
//注册etcd resolvermyResolver := myresolver.NewResolver("myetcdhostip:12379")resolver.Register(myResolver)// 连接服务器conn, err := grpc.Dial(myResolver.Scheme()+"://author/"+"GreetService", grpc.WithInsecure())if err != nil {fmt.Printf("faild to connect: %v", err)}defer conn.Close()
序4:总结
总结一下,在 gRPC 中,resolver 主要实现了如下功能:
定义 target
实现 resolver.Builder
实现 resolver.Resolver
调用 resolver.Register 注册自定义的 Resolver,其中 name 为 target 中的scheme
实现服务发现逻辑
