vlambda博客
学习文章列表

gRPC 服务端和客户端源码分析(golang)

第一部分 gRPC介绍

gRPC是什么,A high-performance, open-source universal RPC framework。RPC是什么,remote procedure call,远程过程调用,应用程序之间使用RPC通信,函数调用与本地调用无异。

gRPC提供一套server/client模型通信机制,其特点如下:


  • 使用http2.0作为底层传输协议,支持流式通信,多路复用( 多路复用原理http1.1基于“文本分割”,服务端读取直至遇到分隔符,一次只能处理一个请求或响应,且数据解析无法预知内存大小。http2.0基于二进制“帧”,定义不同类型帧,请求和响应交错复用。流式通信原理http2.0连接上独立的、双向的帧序列交流。帧首部6-9字节表流ID,用来标识帧所属的流 );

  • 可使用protobuf定义接口数据,二进制编码,减少需要传输的数据量( protobuf轻量与json比较,json为key-value,每个key都占用多个字节,int类型固定字节数。protobuf使用tag,编号标记key,仅占一字节,支持Varint,数据按实际长度存储)

  • 解决不同语言间通信的复杂性以及环境的不同

  • 一次性在一个.proto文件中定义服务,并使用任何支持它的语言去实现客户端和服务器

gRPC的使用场景与restful API相同。


第二部分 gRPC通信过程源码分析

gRPC提供一套server/client端通信机制,源码分析立足于分析利用gRPC进行模块调用整个通信过程。结合文章中的代码,梳理gRPC通信过程源码。此部分共分为三个小节,分别是:

  • 客户端及服务端入口代码解析

  • 客户端请求逻辑

  • 服务端处理逻辑

1:Server端及Client端入口代码解析


如果项目中,你只需要简单的利用gRPC远程调用函数。由上图可知,用户只需关心一下几点:

服务端:

  • 注册并开启服务

  • 持续监听client请求

客户端:

  • 与server端建立连接

  • 初始化客户端并构建msg

  • 请求服务端,并接收响应数据

其中,客户端请求c.SayHello()指定调用 "/proto.Hello/SayHello"服务端定义的helloService.SayHello()。由helloService.SayHello()的执行逻辑可知,其将client端请求的“hello world!”消息直接返回client端。

那么,client端的request是如何发送值server端,server端接收消息,调用helloService.SayHello()函数后,是如何将消息返回client端的,是接下来我们具体研究的内容。

2、客户端请求逻辑

client调用grpc.Dial(),返回一个ClientConn,底层本质上调用newHTTP2Client,与server建立http2连接。

func newHTTP2Client(), onClose func()) (_ *http2Client, err error) { //dial一个tcp连接 conn, err := dial(connectCtx, opts.Dialer, addr.Addr)  //循环读取帧,并按类型分发入流 go t.reader() // Send connection preface to server. n, err := t.conn.Write(clientPreface) /*************省略***********/}


调用grpc.Dial()之后,client和server的连接已经建立起来了,此时需初始化客户端对象:

c := pb.NewHelloClient(conn)//创建helloClient实例
func NewHelloClient(cc grpc.ClientConnInterface) HelloClient { return &helloClient{cc}}

初始化的helloClient向server发起一次请求msg,即调用hello.pb.go文件中的helloClient.SayHello()方法,其最终还是调用grpc.Invoke()。

 r, err := c.SayHello(context.Background(), reqBody)//调用helloClient对象的的SayHello()方法,并传入参数
func (c *helloClient) SayHello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Reply, error) { out := new(Reply) err := c.cc.Invoke(ctx, "/proto.Hello/SayHello", in, out, opts...)// if err != nil { return nil, err } return out, nil}

grpc.Invoke()最终调用的是invoke()方法,invoke()表示一次请求过程,包括:

  • 新建clientStream对象,标记调用“/proto.Hello/SayHello” Method,监听用户是否关闭此连接

  • 发送请求SendMsg(),底层调用http2client.Write方法发送req。底层调用函数依次为clientStream.SendMsg()->csAttempt.sendMsg()->http2Client.Write()

  • 接收响应数据RecvMsg(),底层调用函数关系依次为RecvMsg()->recvMsg()->recv()->Stream.Read()->io.ReadFull()->io.ReadAtLeast() ->transportReader.Read()->recvBufferReader.Read()->recvBufferReader.readClient()。readClient从接收数据channel里读取数据。接收数据channel数据依赖http2协议写入。

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) if err != nil { return err } if err := cs.SendMsg(req); err != nil { return err } return cs.RecvMsg(reply)}/**********************SendMsg**********************/func (cs *clientStream) SendMsg(m interface{}) (err error) { err := a.sendMsg(m, hdr, payload, data)}
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})}
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { df := &dataFrame{ streamID: s.id, endStream: opts.Last, } return t.controlBuf.put(df)}
/**********************RecvMsg**********************/func (cs *clientStream) RecvMsg(m interface{}) error { return a.recvMsg(m, recvInfo)}func (r *recvBufferReader) readClient(p []byte) (n int, err error) { select { case <-r.ctxDone: m := <-r.recv.get() return r.readAdditional(m, p) case m := <-r.recv.get(): return r.readAdditional(m, p) }}//从接收数据channel中读取数据func (b *recvBuffer) get() <-chan recvMsg { return b.c}


3、服务端处理逻辑
grpc_server.go文件开启服务监听,接收并处理客户端发送的请求

s.Serve(listen)//阻塞监听客户端请求
func (s *Server) Serve(lis net.Listener) error { for { rawConn, err := lis.Accept() go func() { s.handleRawConn(rawConn) }() }}

接下来我们看看server端如何处理客户端发送的请求。调用Server.handleRawConn完成客户端身份认证并进行stream处理。stream处理主要进行handler函数匹配并发送response。

func (s *Server) handleRawConn(rawConn net.Conn) { //身份认证并完成握手连接 conn, authInfo, err := s.useTransportAuthenticator(rawConn) st := s.newHTTP2Transport(conn, authInfo) if st == nil { return } //stream处理 go func() { s.serveStreams(st) }()}
func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) go func() { defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) }() }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx } tr := trace.New("grpc.Recv."+methodFamily(method), method) return trace.NewContext(ctx, tr) }) wg.Wait()}
// HandleStreams receives incoming streams using the given handler.func (ht *serverHandlerTransport) HandleStreams() { handleStream() ht.runStream()}
HandleStreams调用handleStream()func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { s.processUnaryRPC(t, stream, srv, md, trInfo)}
func (s *Server) processUnaryRPC(){ 调用处理函数 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) 发送处理数据 err := s.sendResponse(t, stream, reply, cp, opts, comp)}
//sendResponse()调用Write()构建发送Msgfunc (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { return ht.do(func() { ht.writeCommonHeaders(s) ht.rw.Write(hdr) ht.rw.Write(data) ht.rw.(http.Flusher).Flush() })}

总结:gRPC提供一套server/client模型通信机制,底层通信依赖http2。样例中server与client端通信依赖protobuf文件,封装http协议客户端与服务端信息交互机制,client端最终调用http2.Write()发送数据,server端接收request,匹配handler函数,发送response结果给client。


下一篇,将对HTTP协议server端与client源码进行分析。