小米出品——gRPC Name Resolver 原理及实践
本文字数:5380 字
精读时间:12 分钟
也可在 6 分钟内完成速读
01
前言
02
实现自定义 Name Resolver
resolver.go
、
resolver_build.go
、
dail.go
。
ns # 自定义 resolver 包名├── dial.go # 封装了 gRPC 包的 grpc.DialContext() 方法,严格来说 dail.go 不应该放在 ns 包下,本例中这么做只是为简化包布局,方便读者理解├── resolver.go # 实现了 gRPC resolver 包 Resolver 接口的 nsResolver└── resolver_builder.go # 实现了 gRPC resolver 包 ResolverBuilder 接口的 nsResolverBuilder
03
定义 nsResolver
resolver.go
:
package nsimport ("context""encoding/json""fmt""strings""time""mypkg/internal/logz" // 私有日志包,基于 uber 开源的 zap 实现sdk "mypkg/internal/soa-sdk" // 私有 ns sdk 包,封装了内部 soa 平台进行服务发现的 sdk_ "google.golang.org/grpc""google.golang.org/grpc/resolver""google.golang.org/grpc/serviceconfig")const (// syncNSInterval 定义了从 NS 服务同步实例列表的周期syncNSInterval = 1 * time.Second)// nsResolver 实现了 resolver.Resolver 接口type nsResolver struct {target resolver.Targetcc resolver.ClientConnctx context.Contextcancel context.CancelFunc...}// watcher 轮询并更新指定 CalleeService 服务的实例变化func (r *nsResolver) watcher() {r.updateCC()ticker := time.NewTicker(syncNSInterval)for {select {// 当* nsResolver Close 时退出监听case <-r.ctx.Done():ticker.Stop()returncase <-ticker.C:// 调用* nsResolver.updagteCC() 方法,更新实例地址r.updateCC()}}}// updateCC 更新 resolver.Resolver.ClientConn 配置func (r *nsResolver) updateCC() {// 从 NS 服务获取指定 target 的实例列表instances, err := r.getInstances(r.target)// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表if err != nil || len(instances.CalleeIns) == 0 {logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))return}...// 组装实例列表 []resolver.Address// resolver.Address 结构体表示 grpc server 端实例地址var newAddrs []resolver.Addressfor k := range instances.CalleeIns {newAddrs = append(newAddrs, instances.CalleeIns)}...// 更新实例列表// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述r.cc.UpdateState(resolver.State{Addresses: newAddrs})}// ResolveNow 实现了 resolver.Resolver.ResolveNow 方法func (*nsResolver) ResolveNow(o resolver.ResolveNowOption) {}// Close 实现了 resolver.Resolver.Close 方法func (r *nsResolver) Close() {r.cancel()}// instances 包含调用方服务名、被调方服务名、被调方实例列表等数据type instances struct {callerService stringcalleeService stringcalleeIns []string}// getInstances 获取指定服务所有可用的实例列表func (r *nsResolver) getInstances(target resolver.Target) (s *instances, e error) {auths := strings.Split(target.Authority, "@")// auths[0] 为 callerService 名,target.Endpoint 为 calleeService 名// 通过自定义 sdk 从内部 NameServer 查询指定 calleeService 对应的实例列表ins, e := sdk.GetInstances(auths[0], target.Endpoint)if e != nil {return nil, e}return &instances{callerService: auths[0],calleeService: target.Endpoint,calleeIns: ins.Instances,}, nil}
04
定义 nsResolverBuilder
ns/resolver_builder.go
构建 nsResolver 时,我们参考
google.golang.org/grpc/resolver/dns/dns_resolver.go
源码,采用 Builder 设计模式:
package nsimport ("context""fmt""google.golang.org/grpc/resolver")// init 将定义好的 NS Builder 注册到 resolver 包中func init() {resolver.Register(NewBuilder())}// NewBuilder 构造 nsResolverBuilderfunc NewBuilder() resolver.Builder {return &nsResolverBuilder{}}// nsResolverBuilder 实现了 resolver.Builder 接口,用来构造定义好的 Resolver Buldertype nsResolverBuilder struct{}// URI 返回某个服务的统一资源描述符(URI),这个 URI 可以从 nsResolver 中查询实例列表// URI 设计时可以遵循 RFC-3986(https://tools.ietf.org/html/rfc3986) 规范,// 比如本例中 ns 格式为:ns://callerService:@calleeService// 其中 ns 为协议名,callerService 为订阅方服务名(即主调方服务名),calleeService 为发布方服务名(即被调方服务名)func URI(callerService, calleeService string) string {return fmt.Sprintf("ns://%s:@%s", callerService, calleeService)}// Build 实现了 resolver.Builder.Build 方法func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {ctx, cancel := context.WithCancel(context.Background())r := &nsResolver{target: target,cc: cc,ctx: ctx,cancel: cancel,}// 启动协程,响应指定 Name 服务实例变化go r.watcher()return r, nil}// Scheme 实现了 resolver.Builder.Scheme 方法// Scheme 方法定义了 ns resolver 的协议名func (*nsResolverBuilder) Scheme() string {return "ns"}
05
封装 gRPC.Dial() 方法
nsResolver
nsResolverBuilder
后,我们还需要对
grpc.Dial()
方法进行封装,方便业务方适用。封装后
dial.go
代码如下所示(
严格来说
dial.go
不应该放在
ns
包中,本例中这么做只是为简化包布局,方便读者理解
)
:
package ns// Dial 封装 `grpc.Dial()` 方法以供业务方代码初始化 *grpc.ClientConn。// 业务方可使用此 Dial 方法基于主调方服务名、被调方服务名等参数构造 *grpc.ClientConn 实例,// 随后可在业务代码中使用 *grpc.ClientConn 实例构造桩代码中生成的 grpcServiceClient 并发起 RPC 调用。func Dial(callerService, calleeService string, dialOpts ...grpc.DialOption) (*grpc.ClientConn, error) {// 根据 callerService 和 calleeService 构造对应的 URIURI := ns.URI(callerService, calleeService)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()// 设置拨号配置opts := []grpc.DialOption{grpc.WithBlock(),grpc.WithInsecure(),}dialOpts = append(dialOpts, dialOpts...)// 调用 grpc.DialContext() 方法拨号conn, err := grpc.DialContext(ctx,URI,opts...,)if err != nil {logz.Warn("did not connect", logz.Any("target", URI), logz.E(err))return nil, err}return conn, err}
06
gRPC resolver 原理
ns
中定义了两个 go 文件,
resolver.go
和
resolver_builder.go
。
-
前者是整个功能最核心的代码,通过自定义 nsResolver将服务名解析成对应实例。 -
后者是采用 Builder 模式在包初始化时创建并注册构造 nsResover的nsResolverBuilder实例。当客户端通过Dial方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其Build()方法构建自定义nsResolver。
demo.pb
文件:
syntax = "proto3";package demo;service DemoService {rpc SayHi(HiRequest) returns (HiResponse);}message HiRequest {string name = 1;}message HiResponse {string message = 1;}
demo.pb.go
可能如下:
package demo...type DemoServiceClient interface {SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)}type demoServiceClient struct {cc *grpc.ClientConn}// NewDemoServiceClient 业务代码中此方法来构造 *demoServiceClient 实例func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {return &demoServiceClient{cc}}func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {out := new(HiResponse)err := c.cc.Invoke(ctx, "/proto.DemoService/SayHiOK", in, out, opts...)if err != nil {return nil, err}return out, nil}...
*grpc.ClientConn
再发起 RPC 调用代码如下:
import "mypkg/internal/ns"...// 使用上节中封装的 ns.Dial 方法构造 *grpc.ClientConnconn, _ := ns.Dial("my-caller-service", "my-callee-service")// 构造 *demoServiceClientcli := demo.NewDemoServiceClient(conn)// 使用 *demoServiceClient 发起 RPC 调用res, _ := cli.SayHiOK(ctx, &proto.HiRequest{Name: "world"})...
import "mypkg/internal/ns"
包后,在
ns/resolver_builder.go
的 init 阶段会通过
Register()
方法将
nsResolverBuilder
注册到 grpc 内部的一个全局 map 中:
// m 定义为一个全局 map,用于存放 [resolver 协议名 -> resolverBuilder] 键值对var m = make(map[string]Builder)// Register 方法将指定 [resolver 协议名 -> resolverBuilder] 键值对存入 mapfunc Register(b Builder) {m[b.Scheme()] = b}// Get 方法根据传入的 resolver 协议名返回对应的 resolverBuilderfunc Get(scheme string) Builder {if b, ok := m[scheme]; ok {return b}return nil}
ns.Dial()
方法使用 callerService 和 calleeService 构造服务 URI,并使用此 URI 作为参数调用
grpc.DialContext()
方法,来构造
*grpc.ClientConn
实例。
grpc.DialContext()
方法接收三个参数:ctx、target、opts,
ns://my-caller-service:@my-callee-service
,其中
ns
为协议名。grpc 可通过协议名查表来获取对应的 resolverBuilder。opts:是一个变长参数,表示拨号配置选项。
grpc.DialContext()
内部逻辑比较复杂,我们挑重点讲:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {// 构造 ClientConn 实例cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}...// 如果用户指定了 timeout 超时配置,那么初始化一个带超时的 ctx// 如果 defer 阶段已超时,则抛出 j 错误if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()...// Name Resolver 核心逻辑,初始化 resolverBuilder,代码中首先会判断下用户是否指定 resolverBuilder// - 如果有指定 resolverBuilder,则直接使用此 resolverBuilder。// - 如果用户没有指定 resolverBuilder,那么 grpc 做如下操作:// - 通过 parseTarget 方法解析用户传入的 target,本例中即 `ns://my-caller-service:@my-callee-service`,获取 Scheme(协议名)、authority(包含 callerService、calleeService)。// - 查询指定协议对应的 resolverBuilder。if cc.dopts.resolverBuilder == nil {// 解析用户传入的 targetcc.parsedTarget = parseTarget(cc.target)// 通过协议名查表获取对应的 resolverBuildercc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)// 如果表中没查到对应的 resolverBuilder,则使用默认协议查询对应的 resolverBuilder// 默认协议为 `passthrough`,它会从用户解析的 target 中直接读取 endpoint 地址if cc.dopts.resolverBuilder == nil {cc.parsedTarget = resolver.Target{Scheme: resolver.GetDefaultScheme(),Endpoint: target,}cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)}} else {cc.parsedTarget = resolver.Target{Endpoint: target}}...// 使用上面初始化的 resolverBuilder 构建 resolver// 初始化 resolverWrapperrWrapper, err := newCCResolverWrapper(cc)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// 如果客户端配置了 WithBlock option,则会轮询 ClientConn 状态,如果 ClientConn 就绪,则返回 ClientConn。// 如果直到 ctx 超时或被 Cancel ClientConn 依然未就绪,则抛出 ctx.Err() 错误。if cc.dopts.block {for {s := cc.GetState()// 1. 如果 ClientConn 状态为 Ready 则返回此 ClientConn// 2. 如果 ClientConn 状态并非 Ready,且用户配置了 FailOnNonTempDialError,当前 ClientConn 状态为 TransientFailure,且 lbPicker 尝试和服务端实例建立连接时产生错误。根据错误性质做如下处理:// 2.1. 如果此错误是非临时性的错误,则抛出此错误// 2.2. 如果此错误是临时性的错误,则继续轮询 ClientConn 状态,直至 ctx 超时或被外部 Cancelif s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {return nil, ctx.Err()}}}return cc, nil}
newCCResolverWrapper()
方法内部实现:
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {...ccr := &ccResolverWrapper{cc: cc,addrCh: make(chan []resolver.Address, 1),scCh: make(chan string, 1),}var err error// rb.Build() 调用指定 resolveBuilder 的 Build 方法,本例中会执行我们定义的 nsResolverBuilder.Builder() 方法ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})if err != nil {return nil, err}return ccr, nil}
ns/resolver_builder.go
中已经给出了
nsResolverBuilder
实现,我们再看下
nsResolverBuilder.Builder()
方法内部逻辑:
package ns// init 将定义好的 NS Builder 注册到 resolver 包中func init() {resolver.Register(NewBuilder())}...func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {ctx, cancel := context.WithCancel(context.Background())r := &nsResolver{target: target,cc: cc,ctx: ctx,cancel: cancel,}// 启动协程,轮询并更新指定 CalleeService 服务的实例变化go r.watcher()return r, nil}...
ns/resolver.go
中给出了
nsResolver
实现,我们再看下
nsResolver.watch()
方法:
package ns...// watcher 轮询并更新指定 CalleeService 服务的实例变化func (r *nsResolver) watcher() {r.updateCC()ticker := time.NewTicker(syncNSInterval)for {select {// 当* nsResolver Close 时退出case <-r.ctx.Done():ticker.Stop()returncase <-ticker.C:// 调用* nsResolver.updagteCC() 方法,更新实例地址r.updateCC()}}}// updateCC 更新 resolver.Resolver.ClientConn 配置func (r *nsResolver) updateCC() {// 从 NS 服务获取指定 target 的实例列表instances, err := r.getInstances(r.target)// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表if err != nil || len(instances.CalleeIns) == 0 {logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))return}...// 组装实例列表 []resolver.Address// resolver.Address 结构体表示 grpc server 端实例地址var newAddrs []resolver.Addressfor k := range instances.calleeIns {newAddrs = append(newAddrs, resolver.Address{Addr: instances.CalleeIns[k],})}...// 更新实例列表// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述。r.cc.UpdateState(resolver.State{Addresses: newAddrs})}...
07
总结
-
客户端启动时,引入自定义的 resolver 包(比如本例中我们自定义的 ns包) -
引入 ns包,在init()阶段,构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中(其实是一个全局 map,key 为协议名,比如ns;value 为构造的 resolveBuilder,比如nsResolverBuilder)。 -
客户端启动时通过自定义 Dail()方法构造 grpc.ClientConn 单例 -
grpc.DialContext()方法内部解析 URI,分析协议类型,并从 resolveBuilder 表中查找协议对应的 resolverBuilder。比如本例中我们定义的 URI 协议类型为ns,对应的 resolverBuilder 为nsResolverBuilder -
找到指定的 resolveBuilder 后,调用 resolveBuilder 的 Build()方法,构建自定义 resolver,同时开启协程,通过此 resolver 更新被调服务实例列表。 -
Dial()方法接收主调服务名和被调服务名,并根据自定义的协议名,基于这两个参数构造服务的 URI -
Dial()方法内部使用构造的 URI,调用grpc.DialContext()方法对指定服务进行拨号 -
grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
聪明又努力的 Gophers,让我知道你“在看”
