vlambda博客
学习文章列表

grpc框架源码分析

rpc( Remote Procedure Call )中文翻译为远程服务间调用,在后端开发中广泛使用,常见的例如 RESTful 框架,使用 http post/get 等方法,实现服务间的相互调用。 grpc 是众多服务间调用框架中的一种, 它由 Google 公司开发,并且已经开源。目前被业界普遍使用,在国内外大厂或多或少都可以见到它的身影,并且有很多公司内部 RPC 框架也都有参考 grpc

grpc框架使用有不少文章都有介绍,本文不再详细说明。启动一个grpc服务看起来非常简单,如下图所示,main函数中首先第行监听某个指定端口,然后第行~第行初始化grpc服务框架,接着第行启动grpc服务。

package main
import ("context""google.golang.org/grpc""log""net"pb "xxx/xxx/helloworld")
const (gport = ":50051")
// GreeterServerImp is used to implement helloworld.GreeterServer.type GreeterServerImp struct{}
// SayHello implements helloworld.GreeterServerfunc (s *GreeterServerImp) SayHi(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) { log.Printf("Received: %v", in.GetMsg()) return &pb.HelloReply{Msg: "Hello " + in.GetMsg()}, nil}
func main() { lis, err := net.Listen("tcp", gport) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterHelloWorldServer(s, &GreeterServerImp{}) if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) }}


 、grpc服务的初始化


1、NewServer的源代码如下:

// NewServer creates a gRPC server which has no service registered and has not// started to accept requests yet.func NewServer(opt ...ServerOption) *Server { opts := defaultServerOptions for _, o := range opt { o.apply(&opts) } s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[transport.ServerTransport]bool), services: make(map[string]*serviceInfo), quit: grpcsync.NewEvent(), done: grpcsync.NewEvent(), czData: new(channelzData), } chainUnaryServerInterceptors(s) chainStreamServerInterceptors(s) s.cv = sync.NewCond(&s.mu) if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) }
if s.opts.numServerWorkers > 0 { s.initServerWorkers() }
if channelz.IsOn() { s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") } return s}


(1)第4行~第7行初始化服务的配置,这里用到golang配置初始化常见的一个技巧,NewServer并不以具体参数做为入参,否则NewServer的入参将变得非常繁琐,而且新增配置项势必要修改NewServer入参列表,实现不太优雅。这里采用了传递函数闭包的办法,将入参统一表示为闭包函数数组。


(2)第8行~第30行初始化了一个Server结构体,Server中的字段特别重要。opts表示服务配置;services表示服务service列表,它是一个service名到包含service处理函数等参数的结构体的map,新service的注册会用到它;第行到第行初始化服务的拦截器配置,拦截器可以看成服务的钩子函数,可用于用户自定义的拓展服务逻辑。第行初始化了服务的workder结构,被用于业务逻辑的处理;第30行注册了zChannel,这个接口被用于统计服务的运行情况,它可根据实际需要配置是否需要。


(3)需要特别注意的是initServerWorkers这个函数的代码如下。这个函数的流程是initServerWorkers -> serverWorker -> handleStream。它将新起Ngorouting,并等待数据输入进行业务逻辑处理,handleStream实际就是所有业务逻辑处理的函数入口!

// initServerWorkers creates worker goroutines and channels to process incoming// connections to reduce the time spent overall on runtime.morestack.func (s *Server) initServerWorkers() { s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) for i := uint32(0); i < s.opts.numServerWorkers; i++ { s.serverWorkerChannels[i] = make(chan *serverWorkerData) go s.serverWorker(s.serverWorkerChannels[i]) }}
// serverWorkers blocks on a *transport.Stream channel forever and waits for// data to be fed by serveStreams. This allows different requests to be// processed by the same goroutine, removing the need for expensive stack// re-allocations (see the runtime.morestack problem [1]).//// [1] https://github.com/golang/go/issues/18138func (s *Server) serverWorker(ch chan *serverWorkerData) { // To make sure all server workers don't reset at the same time, choose a // random number of iterations before resetting. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) for completed := 0; completed < threshold; completed++ { data, ok := <-ch if !ok { return } s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) data.wg.Done() } go s.serverWorker(ch)}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { sm := stream.Method() if sm != "" && sm[0] == '/' { sm = sm[1:] } pos := strings.LastIndex(sm, "/") if pos == -1 { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) trInfo.tr.SetError() } errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() } return } service := sm[:pos] method := sm[pos+1:]
srv, knownService := s.services[service] if knownService { if md, ok := srv.methods[method]; ok { s.processUnaryRPC(t, stream, srv, md, trInfo) return } if sd, ok := srv.streams[method]; ok { s.processStreamingRPC(t, stream, srv, sd, trInfo) return } } // Unknown service, or known server unknown method. if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) return } var errDesc string if !knownService { errDesc = fmt.Sprintf("unknown service %v", service) } else { errDesc = fmt.Sprintf("unknown method %v for service %v", method, service) } if trInfo != nil { trInfo.tr.LazyPrintf("%s", errDesc) trInfo.tr.SetError() } if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) trInfo.tr.SetError() } channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err) } if trInfo != nil { trInfo.tr.Finish() }}


2、业务服务的注册

pb.RegisterHelloWorldServer(s, &GreeterServerImp{})注册了一个helloworld的服务,pb这个服务是由proto服务文件自动生成的。这个函数设置了上述的services: make(map[string]*serviceInfo)结构。

 

二、grpc服务的启动

s.Serve(lis)的源代码如下:

// Serve accepts incoming connections on the listener lis, creating a new// ServerTransport and service goroutine for each. The service goroutines// read gRPC requests and then call the registered handlers to reply to them.// Serve returns when lis.Accept fails with fatal errors. lis will be closed when// this method returns.// Serve will return a non-nil error unless Stop or GracefulStop is called.func (s *Server) Serve(lis net.Listener) error { ... for { rawConn, err := lis.Accept() ... // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) go func() { s.handleRawConn(rawConn) s.serveWG.Done() }() }}
// handleRawConn forks a goroutine to handle a just-accepted connection that// has not had any I/O performed on it yet.func (s *Server) handleRawConn(rawConn net.Conn) { if s.quit.HasFired() { rawConn.Close() return } rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { // ErrConnDispatched means that the connection was dispatched away from // gRPC; those connections should be left open. if err != credentials.ErrConnDispatched { s.mu.Lock() s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) rawConn.Close() } rawConn.SetDeadline(time.Time{}) return }
// Finish handshaking (HTTP2) st := s.newHTTP2Transport(conn, authInfo) if st == nil { return }
rawConn.SetDeadline(time.Time{}) if !s.addConn(st) { return } go func() { s.serveStreams(st) s.removeConn(st) }()}
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup
var roundRobinCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) if s.opts.numServerWorkers > 0 { data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: default: // If all stream workers are busy, fallback to the default code path. go func() { s.handleStream(st, stream, s.traceInfo(st, stream)) wg.Done() }() } } else { go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() } }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait()}


s.Serve的处理流程:Serve -> handleRawConn -> serveStreams

1、Serve 中第10行等待接收连接请求;

2handleRawConn中处理原始的conn请求,并在第49行新建一个数据处理conn

3serveStreams第74行将接收到的数据写到对应频道的worker数据缓存,由atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers可见worker的调用策略即为平均分配。