grpc框架源码分析
grpc框架使用有不少文章都有介绍,本文不再详细说明。启动一个grpc服务看起来非常简单,如下图所示,main函数中首先第行监听某个指定端口,然后第行~第行初始化grpc服务框架,接着第行启动grpc服务。
package mainimport ("context""google.golang.org/grpc""log""net"pb "xxx/xxx/helloworld")const (gport = ":50051")// GreeterServerImp is used to implement helloworld.GreeterServer.type GreeterServerImp struct{}// SayHello implements helloworld.GreeterServerfunc (s *GreeterServerImp) SayHi(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {log.Printf("Received: %v", in.GetMsg())return &pb.HelloReply{Msg: "Hello " + in.GetMsg()}, nil}func main() {lis, err := net.Listen("tcp", gport)if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterHelloWorldServer(s, &GreeterServerImp{})if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}}
一、grpc服务的初始化
1、NewServer的源代码如下:
// NewServer creates a gRPC server which has no service registered and has not// started to accept requests yet.func NewServer(opt ...ServerOption) *Server {opts := defaultServerOptionsfor _, o := range opt {o.apply(&opts)}s := &Server{lis: make(map[net.Listener]bool),opts: opts,conns: make(map[transport.ServerTransport]bool),services: make(map[string]*serviceInfo),quit: grpcsync.NewEvent(),done: grpcsync.NewEvent(),czData: new(channelzData),}chainUnaryServerInterceptors(s)chainStreamServerInterceptors(s)s.cv = sync.NewCond(&s.mu)if EnableTracing {_, file, line, _ := runtime.Caller(1)s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))}if s.opts.numServerWorkers > 0 {s.initServerWorkers()}if channelz.IsOn() {s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")}return s}
(1)第4行~第7行初始化服务的配置,这里用到golang配置初始化常见的一个技巧,NewServer并不以具体参数做为入参,否则NewServer的入参将变得非常繁琐,而且新增配置项势必要修改NewServer入参列表,实现不太优雅。这里采用了传递函数闭包的办法,将入参统一表示为闭包函数数组。
(2)第8行~第30行初始化了一个Server结构体,Server中的字段特别重要。opts表示服务配置;services表示服务service列表,它是一个service名到包含service处理函数等参数的结构体的map,新service的注册会用到它;第行到第行初始化服务的拦截器配置,拦截器可以看成服务的钩子函数,可用于用户自定义的拓展服务逻辑。第行初始化了服务的workder结构,被用于业务逻辑的处理;第30行注册了zChannel,这个接口被用于统计服务的运行情况,它可根据实际需要配置是否需要。
(3)需要特别注意的是initServerWorkers这个函数的代码如下。这个函数的流程是initServerWorkers -> serverWorker -> handleStream。它将新起N个gorouting,并等待数据输入进行业务逻辑处理,handleStream实际就是所有业务逻辑处理的函数入口!
// initServerWorkers creates worker goroutines and channels to process incoming// connections to reduce the time spent overall on runtime.morestack.func (s *Server) initServerWorkers() {s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)for i := uint32(0); i < s.opts.numServerWorkers; i++ {s.serverWorkerChannels[i] = make(chan *serverWorkerData)go s.serverWorker(s.serverWorkerChannels[i])}}// serverWorkers blocks on a *transport.Stream channel forever and waits for// data to be fed by serveStreams. This allows different requests to be// processed by the same goroutine, removing the need for expensive stack// re-allocations (see the runtime.morestack problem [1]).//// [1] https://github.com/golang/go/issues/18138func (s *Server) serverWorker(ch chan *serverWorkerData) {// To make sure all server workers don't reset at the same time, choose a// random number of iterations before resetting.threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)for completed := 0; completed < threshold; completed++ {data, ok := <-chif !ok {return}s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))data.wg.Done()}go s.serverWorker(ch)}func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {sm := stream.Method()if sm != "" && sm[0] == '/' {sm = sm[1:]}pos := strings.LastIndex(sm, "/")if pos == -1 {if trInfo != nil {trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)trInfo.tr.SetError()}errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {if trInfo != nil {trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)trInfo.tr.SetError()}channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)}if trInfo != nil {trInfo.tr.Finish()}return}service := sm[:pos]method := sm[pos+1:]srv, knownService := s.services[service]if knownService {if md, ok := srv.methods[method]; ok {s.processUnaryRPC(t, stream, srv, md, trInfo)return}if sd, ok := srv.streams[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo)return}}// Unknown service, or known server unknown method.if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)return}var errDesc stringif !knownService {errDesc = fmt.Sprintf("unknown service %v", service)} else {errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)}if trInfo != nil {trInfo.tr.LazyPrintf("%s", errDesc)trInfo.tr.SetError()}if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {if trInfo != nil {trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)trInfo.tr.SetError()}channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)}if trInfo != nil {trInfo.tr.Finish()}}
2、业务服务的注册
pb.RegisterHelloWorldServer(s, &GreeterServerImp{})注册了一个helloworld的服务,pb这个服务是由proto服务文件自动生成的。这个函数设置了上述的services: make(map[string]*serviceInfo)结构。
二、grpc服务的启动
s.Serve(lis)的源代码如下:
// Serve accepts incoming connections on the listener lis, creating a new// ServerTransport and service goroutine for each. The service goroutines// read gRPC requests and then call the registered handlers to reply to them.// Serve returns when lis.Accept fails with fatal errors. lis will be closed when// this method returns.// Serve will return a non-nil error unless Stop or GracefulStop is called.func (s *Server) Serve(lis net.Listener) error {...for {rawConn, err := lis.Accept()...// Start a new goroutine to deal with rawConn so we don't stall this Accept// loop goroutine.//// Make sure we account for the goroutine so GracefulStop doesn't nil out// s.conns before this conn can be added.s.serveWG.Add(1)go func() {s.handleRawConn(rawConn)s.serveWG.Done()}()}}// handleRawConn forks a goroutine to handle a just-accepted connection that// has not had any I/O performed on it yet.func (s *Server) handleRawConn(rawConn net.Conn) {if s.quit.HasFired() {rawConn.Close()return}rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))conn, authInfo, err := s.useTransportAuthenticator(rawConn)if err != nil {// ErrConnDispatched means that the connection was dispatched away from// gRPC; those connections should be left open.if err != credentials.ErrConnDispatched {s.mu.Lock()s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)s.mu.Unlock()channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)rawConn.Close()}rawConn.SetDeadline(time.Time{})return}// Finish handshaking (HTTP2)st := s.newHTTP2Transport(conn, authInfo)if st == nil {return}rawConn.SetDeadline(time.Time{})if !s.addConn(st) {return}go func() {s.serveStreams(st)s.removeConn(st)}()}func (s *Server) serveStreams(st transport.ServerTransport) {defer st.Close()var wg sync.WaitGroupvar roundRobinCounter uint32st.HandleStreams(func(stream *transport.Stream) {wg.Add(1)if s.opts.numServerWorkers > 0 {data := &serverWorkerData{st: st, wg: &wg, stream: stream}select {case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:default:// If all stream workers are busy, fallback to the default code path.go func() {s.handleStream(st, stream, s.traceInfo(st, stream))wg.Done()}()}} else {go func() {defer wg.Done()s.handleStream(st, stream, s.traceInfo(st, stream))}()}}, func(ctx context.Context, method string) context.Context {if !EnableTracing {return ctx}tr := trace.New("grpc.Recv."+methodFamily(method), method)return trace.NewContext(ctx, tr)})wg.Wait()}
s.Serve的处理流程:Serve -> handleRawConn -> serveStreams:
1、Serve 中第10行等待接收连接请求;
2、handleRawConn中处理原始的conn请求,并在第49行新建一个数据处理conn;
3、serveStreams第74行将接收到的数据写到对应频道的worker数据缓存,由atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers可见worker的调用策略即为平均分配。
