「gRPC系列」gRPC 服务发现组件 resolver
大家好,我是阿星,今天继续「gRPC系列:全面剖析gRPC 的设计原则与工作原理」的分享,这是第二篇:「gRPC 服务调用:服务发现 resolver 组件」。
前文」,已经了解了 gRPC 远程过程调用的基本原理以及学会使用 gRPC 进行远程服务调用了。
今天我们来了解一下 gRPC 是如何工作的,清楚的理解其调用逻辑,对于我们更好、更深入的使用gRPC很有必要。因此我们必须深度解析下gRPC的实现逻辑,在本文中,将从 gRPC 服务调用的第一个组件:「服务发现组件
- resolver 」开始学习。
序0:gRPC 远程调用
先看一段简单的客户端 gRPC 调用代码:
// 连接服务器
conn, err := grpc.Dial(":10010", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
c := hello.NewGreeterClient(conn)
// 调用服务端的SayHello
r, err := c.SayHello(context.Background(), &hello.HelloRequest{Name: "astar"})
if err != nil {
fmt.Printf("could not greet: %v", err)
}
fmt.Printf("Greeting: %s !\n", r.Message)
整体来说,调用的过程基本就是分为三步:
1. 创建 gRPC 连接
2. 创建业务客户端实例
3. 调用 RPC 接口
下面,我们先来看看第一个步骤「建立 gRPC 连接」的整体调用逻辑的时序图(可点击放大),有兴趣的话可以根据这张图去阅读一下 gRPC 的源码。
不过,上述流程图可能有些复杂,但是没关系,上述流程主要是让大家看一下 gRPC 建立连接的全貌,以便于能够更好地理解。后续系列会逐一分析每个步骤都是在干什么的以及怎么做的。目前只需要了解到这个流程的主要作用是获取一个 ClientConn 对象就完了。那么这个 ClientConn 对象又是什么呢?看一下 ClientConn 的结构:
type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
target string
parsedTarget resolver.Target
balancerBuildOpts balancer.BuildOptions
resolverWrapper *ccResolverWrapper
conns map[*addrConn]struct{}
Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
curBalancerName string
balancerWrapper *ccBalancerWrapper
......
}
我省略了一些参数,只保留了一些需要关注的重要属性。其中整个 RPC 过程中比较重要的组件主要就是 resolver 和 balancer ,也就是上述结构中的 resolverWrapper 和 balancerWrapper 属性。我根据上述流程图,画了一个更清晰的 gRPC 内部关键组件交互图:
下面就先来了解一下 gRPC 服务发现组件: resolover 的工作原理。
序1:gRPC 服务发现核心组件 resolver 介绍
gRPC 的 resolver 组件主要由如下几个结构构成:
他们之间的工作流程如下:
gRPC client 中维护着一个存放 resolver build 的map全局变量,可以支持多种 resolver build 组件的注册,依赖对应的 Scheme 来解析对应的 resolver build
基于对应的 resolver builder 并依赖对应的的 target 参数,Build 出对应的 resolver
依赖实现了 ClientConn 的 resolverWrapper 来具体实现后续对应的连接管理的操作
序2:resolver 工作原理与源码分析
gRPC 获取 resolver 的入口为 parseTargetAndFindResolver 函数,下面就从源码层面上来分析一下其工作原理。
先看一下 parseTarget 这个函数,后面 parseTargetAndFindResolver里会用到:
func parseTarget(target string) (resolver.Target, error) {
//解析target,target的val为":10010"
u, err := url.Parse(target)
if err != nil {
//解析地址出错,生成默认的空的resolver.Target
return resolver.Target{}, err
}
endpoint := u.Path
if endpoint == "" {
endpoint = u.Opaque
}
//没有解析出错,就会构建对应的 target 相关参数
endpoint = strings.TrimPrefix(endpoint, "/")
return resolver.Target{
Scheme: u.Scheme,
Authority: u.Host,
Endpoint: endpoint,
URL: *u,
}, nil
}
再回到 parseTargetAndFindResolver函数:
func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
channelz.Infof(logger, cc.channelzID, "original dial target is: %q", cc.target)
var rb resolver.Builder
//1. 这一步因为target的scheme为空,所以解析会出错,
// 就会直接跳过else的代码
parsedTarget, err := parseTarget(cc.target)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", cc.target, err)
} else {
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
rb = cc.getResolver(parsedTarget.Scheme)
if rb != nil {
cc.parsedTarget = parsedTarget
return rb, nil
}
}
//2. 没有获取到resolver则使用默认的scheme,即passthought
defScheme := resolver.GetDefaultScheme()
channelz.Infof(logger, cc.channelzID, "fallback to scheme %q", defScheme)
canonicalTarget := defScheme + ":///" + cc.target
//3. 再次解析target,这次会解析成功,并且GetDefaultScheme时已经
// 隐式注册了passthought 的 resolver builder
parsedTarget, err = parseTarget(canonicalTarget)
if err != nil {
channelz.Infof(logger, cc.channelzID, "dial target %q parse failed: %v", canonicalTarget, err)
return nil, err
}
channelz.Infof(logger, cc.channelzID, "parsed dial target is: %+v", parsedTarget)
//4. 获取到 passthought resolver builder
rb = cc.getResolver(parsedTarget.Scheme)
if rb == nil {
return nil, fmt.Errorf("could not get resolver for default scheme: %q", parsedTarget.Scheme)
}
cc.parsedTarget = parsedTarget
return rb, nil
}
至此,解析获取 resolver builder 的过程就完结了,其实并不复杂,我们总结一下,看看 resolver 是怎么解析的。
一个target的构成单元为:scheme + ://author/+endpoint
通过不同的 scheme 解析并构建出对应的 target 对象。下面再来看看 gRPC 默认的 passthrough resolver 的工作原理:
package passthrough
import "google.golang.org/grpc/resolver"
const scheme = "passthrough"
type passthroughBuilder struct{}
func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
//1. 直接将地址放到resolver中即可,
// 如果是dns解析器会通过该函数解析到实际的地址,并后续进行连接
r := &passthroughResolver{
target: target,
cc: cc,
}
//直接基于上述地址触发balancer的构建,基于balancer来管理连接
r.start()
return r, nil
}
func (*passthroughBuilder) Scheme() string {
return scheme
}
type passthroughResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *passthroughResolver) start() {
// 依赖cc来管理连接,cc底层会触发balancer的构建
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
}
func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*passthroughResolver) Close() {}
func init() {
resolver.Register(&passthroughBuilder{})
}
序3:基于etcd实现的自定义grpc resolver组件
gRPC 内置的服务发现组件过于单调了,下面,我们基于实现一套更为负载的grpc组件。
基于 etcd 实现一套基于 etcd 的自定义 resolver 组件:
package
resolver
import (
"context"
"fmt"
"go.etcd.io/etcd/api/v3/mvccpb"
"google.golang.org/grpc/resolver"
"log"
"strings"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
)
const scheme = "etcd"
var cli *clientv3.Client
// etcdResolver etcd解析器
type etcdResolver struct {
etcdAddr string
clientConn resolver.ClientConn
}
//初始化一个etcd解析器
func NewResolver(etcdAddr string) resolver.Builder {
return &etcdResolver{etcdAddr: etcdAddr}
}
func (r *etcdResolver) Scheme() string {
return schema
}
//watch有变化以后会调用
func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
log.Println("ResolveNow")
fmt.Println(rn)
}
//解析器关闭时调用
func (r *etcdResolver) Close() {
log.Println("Close")
}
//构建解析器 grpc.Dial()同步调用
func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
var err error
//构建etcd client
if cli == nil {
cli, err = clientv3.New(clientv3.Config{
Endpoints: strings.Split(r.etcdAddr, ";"),
DialTimeout: 15 * time.Second,
})
if err != nil {
fmt.Printf("连接etcd失败:%s\n", err)
return nil, err
}
}
r.clientConn = clientConn
//监听服务地址变化
go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
return r, nil
}
//监听etcd中某个key前缀的服务地址列表的变化
func (r *etcdResolver) watch(keyPrefix string) {
//初始化服务地址列表
var addrList []resolver.Address
resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
if err != nil {
fmt.Println("获取服务地址列表失败:", err)
} else {
for i := range resp.Kvs {
addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
}
}
r.clientConn.UpdateState(resolver.State{Addresses:addrList})
//监听服务地址列表的变化
rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
for n := range rch {
for _, ev := range n.Events {
addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
switch ev.Type {
case mvccpb.PUT:
if !exists(addrList, addr) {
addrList = append(addrList, resolver.Address{Addr: addr})
r.clientConn.UpdateState(resolver.State{Addresses:addrList})
}
case mvccpb.DELETE:
if s, ok := remove(addrList, addr); ok {
addrList = s
r.clientConn.UpdateState(resolver.State{Addresses:addrList})
}
}
}
}
}
func exists(l []resolver.Address, addr string) bool {
for i := range l {
if l[i].Addr == addr {
return true
}
}
return false
}
func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
for i := range s {
if s[i].Addr == addr {
s[i] = s[len(s)-1]
return s[:len(s)-1], true
}
}
return nil, false
}
再来看看如何使用:
//注册etcd resolver
myResolver := myresolver.NewResolver("myetcdhostip:12379")
resolver.Register(myResolver)
// 连接服务器
conn, err := grpc.Dial(myResolver.Scheme()+"://author/"+"GreetService", grpc.WithInsecure())
if err != nil {
fmt.Printf("faild to connect: %v", err)
}
defer conn.Close()
序4:总结
总结一下,在 gRPC 中,resolver 主要实现了如下功能:
定义 target
实现 resolver.Builder
实现 resolver.Resolver
调用 resolver.Register 注册自定义的 Resolver,其中 name 为 target 中的scheme
实现服务发现逻辑