GRPC框架源码分析
RPC(Remote Procedure Call)即远程服务间调用,在后台开发中广泛使用,常见的有RESTful框架,其采用HTTP Post/Get等方法实现了远程服务间的相互调用。本文所述GRPC是众多服务间调用框架中的一种,它是由Google公司开发,目前已实现了基于Go、C++、Java等多种语言的版本,并且所有代码均已开源(https://github.com/grpc/grpc)。GRPC一历面世,便得到了业界的普遍使用,在国内外互联网大厂或多或少都可以看到它的身影。
本文GRPC源码分析采用GRPC-GO版本(https://google.golang.org/grpc、https://github.com/grpc/grpc-go)。如下文所示,启动一个GRPC HelloWorld服务非常简单,包含三个步骤:1、在main函数中(第25行)监听某个服务端口;2、第29行~第30行初始化GRPC服务框架;3、第31行启动GRPC-服务。
其中与GRPC相关的主要是第2步骤和第3步骤,下文GRPC框架源码分析分别以这两个部分为中心展开。
package main
import (
"context"
"google.golang.org/grpc"
"log"
"net"
pb "xxx/xxx/helloworld"
)
const (
gport = ":50051"
)
// GreeterServerImp is used to implement helloworld.GreeterServer.
type GreeterServerImp struct{}
// SayHello implements helloworld.GreeterServer
func (s *GreeterServerImp) SayHi(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetMsg())
return &pb.HelloReply{Msg: "Hello " + in.GetMsg()}, nil
}
func main() {
lis, err := net.Listen("tcp", gport)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHelloWorldServer(s, &GreeterServerImp{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
一、GRPC服务的初始化
GRPC服务初始化包括两个部分:1、grpc.NewServer服务框架初始化,这个步骤的目的是为了初始化GRPC服务相关的配置信息,并新起N个worker协程等待业务数据到来;2、pb.RegisterHelloWorldServer注册业务Server,这里才是真正与具体业务相关的逻辑。
1、服务框架初始化
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
(1)第4行~第7行初始化服务的配置,这里用到golang配置初始化常见的一个技巧。NewServer并不以具体参数做为入参,否则NewServer的入参将变得非常繁琐,而且如果新增配置势必要修改NewServer入参列表,实现方式不太优雅。这里将入参封装成一个函数闭包,这样入参列表可统一表示为闭包函数数组。
(2)第8行~第30行初始化了Server结构体,这是GRPC框架最重要的数据结构,Server结构体中的字段都特别重要!其中opts表示服务配置,用户可根据实际情况选择性地配置;services表示服务service列表,它其实是一个service名到包含service处理函数等参数的映射,新service的注册本质上就是在映射中新增一项;
(3)第17行~第18行初始化服务的拦截器配置,拦截器可以看成GRPC服务框架的钩子函数,它常被用于用户自定义GRPC服务框架的拓展逻辑。
(4)第26行初始化了服务的业务worker配置,它们被用于业务逻辑的处理。其中initServerWorkers函数是框架代码与业务代码结合最为紧密的部分。这个函数的流程是initServerWorkers -> serverWorker -> handleStream,其具体实现如下。框架通过它新起了N个gorouting协程,并等待数据输入后调用业务逻辑进行处理,其中handleStream实际就是所有业务逻辑处理的函数入口!
(5)第30行注册了zChannel service,这个service是GRPC框架自带的,被用于统计GRPC服务的运行情况,它可根据实际需要配置是否需要。
// initServerWorkers creates worker goroutines and channels to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
go s.serverWorker(s.serverWorkerChannels[i])
}
}
// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker(ch chan *serverWorkerData) {
// To make sure all server workers don't reset at the same time, choose a
// random number of iterations before resetting.
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-ch
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done()
}
go s.serverWorker(ch)
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}
2、业务服务的注册
pb.RegisterHelloWorldServer(...)注册了一个helloworld的service,这个service是由helloworld对应的proto服务文件自动生成的。这个函数设置了上述的services: make(map[string]*serviceInfo)结构,实现在GRPC框架中注册新的业务service。
func RegisterHelloWorldServer(s *grpc.Server, srv HelloWorldServer) {
s.RegisterService(&_HelloWorld_serviceDesc, srv)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
}
if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
}
s.services[sd.ServiceName] = info
}
二、GRPC服务的启动
GRPC服务的启动通过s.Serve函数完成,s.Serve的处理流程Serve -> handleRawConn -> serveStreams,其具体实现如下:
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
...
for {
rawConn, err := lis.Accept()
...
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
(1)第10行等待接收client的连接请求;
(2) handleRawConn中处理原始的conn请求,由于TRPC采用HTTP2所以在第49行重新建HTTP2的连接conn;
(3)serveStreams第74行将接收到的数据写到对应频道的worker数据缓存,由atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers可见worker的调用策略即为轮流分配。