gRPC 服务端和客户端源码分析(golang)
第一部分 gRPC介绍
gRPC是什么,A high-performance, open-source universal RPC framework。RPC是什么,remote procedure call,远程过程调用,应用程序之间使用RPC通信,函数调用与本地调用无异。
gRPC提供一套server/client模型通信机制,其特点如下:
使用http2.0作为底层传输协议,支持流式通信,多路复用( 多路复用原理:http1.1基于“文本分割”,服务端读取直至遇到分隔符,一次只能处理一个请求或响应,且数据解析无法预知内存大小。http2.0基于二进制“帧”,定义不同类型帧,请求和响应交错复用。流式通信原理:http2.0连接上独立的、双向的帧序列交流。帧首部6-9字节表流ID,用来标识帧所属的流 );
可使用protobuf定义接口数据,二进制编码,减少需要传输的数据量( protobuf轻量:与json比较,json为key-value,每个key都占用多个字节,int类型固定字节数。protobuf使用tag,编号标记key,仅占一字节,支持Varint,数据按实际长度存储)
解决不同语言间通信的复杂性以及环境的不同
一次性在一个.proto文件中定义服务,并使用任何支持它的语言去实现客户端和服务器
gRPC的使用场景与restful API相同。
第二部分 gRPC通信过程源码分析
gRPC提供一套server/client端通信机制,源码分析立足于分析利用gRPC进行模块调用整个通信过程。结合文章中的代码,梳理gRPC通信过程源码。此部分共分为三个小节,分别是:
客户端及服务端入口代码解析
客户端请求逻辑
服务端处理逻辑
1:Server端及Client端入口代码解析
如果项目中,你只需要简单的利用gRPC远程调用函数。由上图可知,用户只需关心一下几点:
服务端:
注册并开启服务
持续监听client请求
客户端:
与server端建立连接
初始化客户端并构建msg
请求服务端,并接收响应数据
其中,客户端请求c.SayHello()指定调用 "/proto.Hello/SayHello"服务端定义的helloService.SayHello()。由helloService.SayHello()的执行逻辑可知,其将client端请求的“hello world!”消息直接返回client端。
那么,client端的request是如何发送值server端,server端接收消息,调用helloService.SayHello()函数后,是如何将消息返回client端的,是接下来我们具体研究的内容。
2、客户端请求逻辑
client调用grpc.Dial(),返回一个ClientConn,底层本质上调用newHTTP2Client,与server建立http2连接。
func newHTTP2Client(), onClose func()) (_ *http2Client, err error) {//dial一个tcp连接conn, err := dial(connectCtx, opts.Dialer, addr.Addr)//循环读取帧,并按类型分发入流go t.reader()// Send connection preface to server.n, err := t.conn.Write(clientPreface)/*************省略***********/}
调用grpc.Dial()之后,client和server的连接已经建立起来了,此时需初始化客户端对象:
c := pb.NewHelloClient(conn)//创建helloClient实例func NewHelloClient(cc grpc.ClientConnInterface) HelloClient {return &helloClient{cc}}
初始化的helloClient向server发起一次请求msg,即调用hello.pb.go文件中的helloClient.SayHello()方法,其最终还是调用grpc.Invoke()。
r, err := c.SayHello(context.Background(), reqBody)//调用helloClient对象的的SayHello()方法,并传入参数func (c *helloClient) SayHello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Reply, error) {out := new(Reply)err := c.cc.Invoke(ctx, "/proto.Hello/SayHello", in, out, opts...)//if err != nil {return nil, err}return out, nil}
grpc.Invoke()最终调用的是invoke()方法,invoke()表示一次请求过程,包括:
新建clientStream对象,标记调用“/proto.Hello/SayHello” Method,监听用户是否关闭此连接
发送请求SendMsg(),底层调用http2client.Write方法发送req。底层调用函数依次为clientStream.SendMsg()->csAttempt.sendMsg()->http2Client.Write()
接收响应数据RecvMsg(),底层调用函数关系依次为RecvMsg()->recvMsg()->recv()->Stream.Read()->io.ReadFull()->io.ReadAtLeast() ->transportReader.Read()->recvBufferReader.Read()->recvBufferReader.readClient()。readClient从接收数据channel里读取数据。接收数据channel数据依赖http2协议写入。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply)}/**********************SendMsg**********************/func (cs *clientStream) SendMsg(m interface{}) (err error) {err := a.sendMsg(m, hdr, payload, data)}func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})}func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {df := &dataFrame{streamID: s.id,endStream: opts.Last,}return t.controlBuf.put(df)}/**********************RecvMsg**********************/func (cs *clientStream) RecvMsg(m interface{}) error {return a.recvMsg(m, recvInfo)}func (r *recvBufferReader) readClient(p []byte) (n int, err error) {select {case <-r.ctxDone:m := <-r.recv.get()return r.readAdditional(m, p)case m := <-r.recv.get():return r.readAdditional(m, p)}}//从接收数据channel中读取数据func (b *recvBuffer) get() <-chan recvMsg {return b.c}
3、服务端处理逻辑
grpc_server.go文件开启服务监听,接收并处理客户端发送的请求
s.Serve(listen)//阻塞监听客户端请求func (s *Server) Serve(lis net.Listener) error {for {rawConn, err := lis.Accept()go func() {s.handleRawConn(rawConn)}()}}
接下来我们看看server端如何处理客户端发送的请求。调用Server.handleRawConn完成客户端身份认证并进行stream处理。stream处理主要进行handler函数匹配并发送response。
func (s *Server) handleRawConn(rawConn net.Conn) {//身份认证并完成握手连接conn, authInfo, err := s.useTransportAuthenticator(rawConn)st := s.newHTTP2Transport(conn, authInfo)if st == nil {return}//stream处理go func() {s.serveStreams(st)}()}func (s *Server) serveStreams(st transport.ServerTransport) {defer st.Close()var wg sync.WaitGroupst.HandleStreams(func(stream *transport.Stream) {wg.Add(1)go func() {defer wg.Done()s.handleStream(st, stream, s.traceInfo(st, stream))}()}, func(ctx context.Context, method string) context.Context {if !EnableTracing {return ctx}tr := trace.New("grpc.Recv."+methodFamily(method), method)return trace.NewContext(ctx, tr)})wg.Wait()}// HandleStreams receives incoming streams using the given handler.func (ht *serverHandlerTransport) HandleStreams() {handleStream()ht.runStream()}HandleStreams调用handleStream()func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {s.processUnaryRPC(t, stream, srv, md, trInfo)}func (s *Server) processUnaryRPC(){调用处理函数reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)发送处理数据err := s.sendResponse(t, stream, reply, cp, opts, comp)}//sendResponse()调用Write()构建发送Msgfunc (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {return ht.do(func() {ht.writeCommonHeaders(s)ht.rw.Write(hdr)ht.rw.Write(data)ht.rw.(http.Flusher).Flush()})}
总结:gRPC提供一套server/client模型通信机制,底层通信依赖http2。样例中server与client端通信依赖protobuf文件,封装http协议客户端与服务端信息交互机制,client端最终调用http2.Write()发送数据,server端接收request,匹配handler函数,发送response结果给client。
下一篇,将对HTTP协议server端与client源码进行分析。
