grpc框架源码分析
grpc框架使用有不少文章都有介绍,本文不再详细说明。启动一个grpc服务看起来非常简单,如下图所示,main函数中首先第行监听某个指定端口,然后第行~第行初始化grpc服务框架,接着第行启动grpc服务。
package main
import (
"context"
"google.golang.org/grpc"
"log"
"net"
pb "xxx/xxx/helloworld"
)
const (
gport = ":50051"
)
// GreeterServerImp is used to implement helloworld.GreeterServer.
type GreeterServerImp struct{}
// SayHello implements helloworld.GreeterServer
func (s *GreeterServerImp) SayHi(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetMsg())
return &pb.HelloReply{Msg: "Hello " + in.GetMsg()}, nil
}
func main() {
lis, err := net.Listen("tcp", gport)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterHelloWorldServer(s, &GreeterServerImp{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
一、grpc服务的初始化
1、NewServer的源代码如下:
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s)
s.cv = sync.NewCond(&s.mu)
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}
if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
return s
}
(1)第4行~第7行初始化服务的配置,这里用到golang配置初始化常见的一个技巧,NewServer并不以具体参数做为入参,否则NewServer的入参将变得非常繁琐,而且新增配置项势必要修改NewServer入参列表,实现不太优雅。这里采用了传递函数闭包的办法,将入参统一表示为闭包函数数组。
(2)第8行~第30行初始化了一个Server结构体,Server中的字段特别重要。opts表示服务配置;services表示服务service列表,它是一个service名到包含service处理函数等参数的结构体的map,新service的注册会用到它;第行到第行初始化服务的拦截器配置,拦截器可以看成服务的钩子函数,可用于用户自定义的拓展服务逻辑。第行初始化了服务的workder结构,被用于业务逻辑的处理;第30行注册了zChannel,这个接口被用于统计服务的运行情况,它可根据实际需要配置是否需要。
(3)需要特别注意的是initServerWorkers这个函数的代码如下。这个函数的流程是initServerWorkers -> serverWorker -> handleStream。它将新起N个gorouting,并等待数据输入进行业务逻辑处理,handleStream实际就是所有业务逻辑处理的函数入口!
// initServerWorkers creates worker goroutines and channels to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
go s.serverWorker(s.serverWorkerChannels[i])
}
}
// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker(ch chan *serverWorkerData) {
// To make sure all server workers don't reset at the same time, choose a
// random number of iterations before resetting.
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-ch
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done()
}
go s.serverWorker(ch)
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
}
pos := strings.LastIndex(sm, "/")
if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
return
}
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.services[service]
if knownService {
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// Unknown service, or known server unknown method.
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
var errDesc string
if !knownService {
errDesc = fmt.Sprintf("unknown service %v", service)
} else {
errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
}
if trInfo != nil {
trInfo.tr.LazyPrintf("%s", errDesc)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
}
}
2、业务服务的注册
pb.RegisterHelloWorldServer(s, &GreeterServerImp{})注册了一个helloworld的服务,pb这个服务是由proto服务文件自动生成的。这个函数设置了上述的services: make(map[string]*serviceInfo)结构。
二、grpc服务的启动
s.Serve(lis)的源代码如下:
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
...
for {
rawConn, err := lis.Accept()
...
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
}
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
// ErrConnDispatched means that the connection was dispatched away from
// gRPC; those connections should be left open.
if err != credentials.ErrConnDispatched {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
channelz.Warningf(logger, s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
rawConn.Close()
}
rawConn.SetDeadline(time.Time{})
return
}
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
default:
// If all stream workers are busy, fallback to the default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
s.Serve的处理流程:Serve -> handleRawConn -> serveStreams:
1、Serve 中第10行等待接收连接请求;
2、handleRawConn中处理原始的conn请求,并在第49行新建一个数据处理conn;
3、serveStreams第74行将接收到的数据写到对应频道的worker数据缓存,由atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers可见worker的调用策略即为平均分配。