vlambda博客
学习文章列表

GRPC框架源码分析

RPCRemote Procedure Call)即远程服务间调用,在后台开发中广泛使用,常见的有RESTful框架,其采用HTTP Post/Get等方法实现了远程服务间的相互调用。本文所述GRPC是众多服务间调用框架中的一种,它是由Google公司开发,目前已实现了基于GoC++Java等多种语言的版本,并且所有代码均已开源(https://github.com/grpc/grpc)。GRPC一历面世,便得到了业界的普遍使用,在国内外互联网大厂或多或少都可以看到它的身影。

 

本文GRPC源码分析采用GRPC-GO版本(https://google.golang.org/grpchttps://github.com/grpc/grpc-go)。如下文所示,启动一个GRPC HelloWorld服务非常简单,包含三个步骤:1、在main函数中(25行)监听某个服务端口;2、第29~30行初始化GRPC服务框架;3、第31行启动GRPC-服务。

 

其中与GRPC相关的主要是第2步骤和第3步骤,下文GRPC框架源码分析分别以这两个部分为中心展开。

package main
import ("context""google.golang.org/grpc""log""net"pb "xxx/xxx/helloworld")
const (gport = ":50051")
// GreeterServerImp is used to implement helloworld.GreeterServer.type GreeterServerImp struct{}
// SayHello implements helloworld.GreeterServerfunc (s *GreeterServerImp) SayHi(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetMsg()) return &pb.HelloReply{Msg: "Hello " + in.GetMsg()}, nil}
func main() { lis, err := net.Listen("tcp", gport) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterHelloWorldServer(s, &GreeterServerImp{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) }}


一、GRPC服务的初始化

 

GRPC服务初始化包括两个部分:1grpc.NewServer服务框架初始化,这个步骤的目的是为了初始化GRPC服务相关的配置信息,并新起Nworker协程等待业务数据到来;2pb.RegisterHelloWorldServer注册业务Server,这里才是真正与具体业务相关的逻辑。

 

1、服务框架初始化

// NewServer creates a gRPC server which has no service registered and has not// started to accept requests yet.func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o.apply(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[transport.ServerTransport]bool), services: make(map[string]*serviceInfo), quit: grpcsync.NewEvent(), done: grpcsync.NewEvent(), czData: new(channelzData), } chainUnaryServerInterceptors(s) chainStreamServerInterceptors(s) s.cv = sync.NewCond(&s.mu) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) }
if s.opts.numServerWorkers > 0 { s.initServerWorkers() }
if channelz.IsOn() { s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") } return s}


1)第4~7行初始化服务的配置,这里用到golang配置初始化常见的一个技巧。NewServer并不以具体参数做为入参,否则NewServer的入参将变得非常繁琐,而且如果新增配置势必要修改NewServer入参列表,实现方式不太优雅。这里将入参封装成一个函数闭包,这样入参列表可统一表示为闭包函数数组。

 

(2)8~30行初始化了Server结构体,这是GRPC框架最重要的数据结构,Server结构体中的字段都特别重要!其中opts表示服务配置,用户可根据实际情况选择性地配置;services表示服务service列表,它其实是一个service名到包含service处理函数等参数的映射,新service的注册本质上就是在映射中新增一项;

 

(3)17~18行初始化服务的拦截器配置,拦截器可以看成GRPC服务框架的钩子函数,它常被用于用户自定义GRPC服务框架的拓展逻辑。

 

(4)26行初始化了服务的业务worker配置它们被用于业务逻辑的处理。其中initServerWorkers函数是框架代码与业务代码结合最为紧密的部分。这个函数的流程是initServerWorkers -> serverWorker -> handleStream,其具体实现如下框架通过它新起了Ngorouting协程,并等待数据输入后调用业务逻辑进行处理,其中handleStream实际就是所有业务逻辑处理的函数入口!

 

(5)30行注册了zChannel service,这个serviceGRPC框架自带的,被用于统计GRPC服务的运行情况,它可根据实际需要配置是否需要。

// initServerWorkers creates worker goroutines and channels to process incoming// connections to reduce the time spent overall on runtime.morestack.func (s *Server) initServerWorkers() { s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) for i := uint32(0); i < s.opts.numServerWorkers; i++ { s.serverWorkerChannels[i] = make(chan *serverWorkerData) go s.serverWorker(s.serverWorkerChannels[i]) }}
// serverWorkers blocks on a *transport.Stream channel forever and waits for// data to be fed by serveStreams. This allows different requests to be// processed by the same goroutine, removing the need for expensive stack// re-allocations (see the runtime.morestack problem [1]).//// [1] https://github.com/golang/go/issues/18138func (s *Server) serverWorker(ch chan *serverWorkerData) { // To make sure all server workers don't reset at the same time, choose a // random number of iterations before resetting. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) for completed := 0; completed < threshold; completed++ { data, ok := <-ch if !ok { return } s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) data.wg.Done() } go s.serverWorker(ch)}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } pos := strings.LastIndex(sm, "/") if pos == -1 { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) trInfo.tr.SetError() } errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() } return } service := sm[:pos] method := sm[pos+1:]
srv, knownService := s.services[service] if knownService { if md, ok := srv.methods[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.streams[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } } // Unknown service, or known server unknown method. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) return } var errDesc string if !knownService { errDesc = fmt.Sprintf("unknown service %v", service) } else { errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) } if trInfo != nil { trInfo.tr.LazyPrintf("%s", errDesc) trInfo.tr.SetError() } if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() }}


2、业务服务的注册

pb.RegisterHelloWorldServer(...)注册了一个helloworldservice,这个service是由helloworld对应的proto服务文件自动生成的。这个函数设置了上述的services: make(map[string]*serviceInfo)结构,实现在GRPC框架中注册新的业务service

func RegisterHelloWorldServer(s *grpc.Server, srv HelloWorldServer) { s.RegisterService(&_HelloWorld_serviceDesc, srv)}
func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if s.serve { logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.services[sd.ServiceName]; ok { logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } info := &serviceInfo{ serviceImpl: ss, methods: make(map[string]*MethodDesc), streams: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] info.methods[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] info.streams[d.StreamName] = d } s.services[sd.ServiceName] = info}

 

二、GRPC服务的启动

 

GRPC服务的启动通过s.Serve函数完成,s.Serve的处理流程Serve -> handleRawConn -> serveStreams其具体实现如下:

// Serve accepts incoming connections on the listener lis, creating a new// ServerTransport and service goroutine for each. The service goroutines// read gRPC requests and then call the registered handlers to reply to them.// Serve returns when lis.Accept fails with fatal errors. lis will be closed when// this method returns.// Serve will return a non-nil error unless Stop or GracefulStop is called.func (s *Server) Serve(lis net.Listener) error { ... for { rawConn, err := lis.Accept() ... // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() }}
// handleRawConn forks a goroutine to handle a just-accepted connection that// has not had any I/O performed on it yet.func (s *Server) handleRawConn(rawConn net.Conn) { if s.quit.HasFired() { rawConn.Close() return } rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { // ErrConnDispatched means that the connection was dispatched away from // gRPC; those connections should be left open. if err != credentials.ErrConnDispatched { s.mu.Lock() s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) rawConn.Close() } rawConn.SetDeadline(time.Time{}) return }
// Finish handshaking (HTTP2) st := s.newHTTP2Transport(conn, authInfo) if st == nil { return }
rawConn.SetDeadline(time.Time{}) if !s.addConn(st) { return } go func() { s.serveStreams(st) s.removeConn(st) }()}
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup
var roundRobinCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) if s.opts.numServerWorkers > 0 { data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: default: // If all stream workers are busy, fallback to the default code path. go func() { s.handleStream(st, stream, s.traceInfo(st, stream)) wg.Done() }() } } else { go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() } }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait()}


1)第10行等待接收client的连接请求;

2handleRawConn中处理原始的conn请求,由于TRPC采用HTTP2所以在第49行重新建HTTP2的连接conn

3serveStreams74行将接收到的数据写到对应频道的worker数据缓存,由atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers可见worker的调用策略即为轮流分配。