gRPC 服务端和客户端源码分析(golang)
第一部分 gRPC介绍
gRPC是什么,A high-performance, open-source universal RPC framework。RPC是什么,remote procedure call,远程过程调用,应用程序之间使用RPC通信,函数调用与本地调用无异。
gRPC提供一套server/client模型通信机制,其特点如下:
使用http2.0作为底层传输协议,支持流式通信,多路复用( 多路复用原理:http1.1基于“文本分割”,服务端读取直至遇到分隔符,一次只能处理一个请求或响应,且数据解析无法预知内存大小。http2.0基于二进制“帧”,定义不同类型帧,请求和响应交错复用。流式通信原理:http2.0连接上独立的、双向的帧序列交流。帧首部6-9字节表流ID,用来标识帧所属的流 );
可使用protobuf定义接口数据,二进制编码,减少需要传输的数据量( protobuf轻量:与json比较,json为key-value,每个key都占用多个字节,int类型固定字节数。protobuf使用tag,编号标记key,仅占一字节,支持Varint,数据按实际长度存储)
解决不同语言间通信的复杂性以及环境的不同
一次性在一个.proto文件中定义服务,并使用任何支持它的语言去实现客户端和服务器
gRPC的使用场景与restful API相同。
第二部分 gRPC通信过程源码分析
gRPC提供一套server/client端通信机制,源码分析立足于分析利用gRPC进行模块调用整个通信过程。结合文章中的代码,梳理gRPC通信过程源码。此部分共分为三个小节,分别是:
客户端及服务端入口代码解析
客户端请求逻辑
服务端处理逻辑
1:Server端及Client端入口代码解析
如果项目中,你只需要简单的利用gRPC远程调用函数。由上图可知,用户只需关心一下几点:
服务端:
注册并开启服务
持续监听client请求
客户端:
与server端建立连接
初始化客户端并构建msg
请求服务端,并接收响应数据
其中,客户端请求c.SayHello()指定调用 "/proto.Hello/SayHello"服务端定义的helloService.SayHello()。由helloService.SayHello()的执行逻辑可知,其将client端请求的“hello world!”消息直接返回client端。
那么,client端的request是如何发送值server端,server端接收消息,调用helloService.SayHello()函数后,是如何将消息返回client端的,是接下来我们具体研究的内容。
2、客户端请求逻辑
client调用grpc.Dial(),返回一个ClientConn,底层本质上调用newHTTP2Client,与server建立http2连接。
func newHTTP2Client(), onClose func()) (_ *http2Client, err error) {
//dial一个tcp连接
conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
//循环读取帧,并按类型分发入流
go t.reader()
// Send connection preface to server.
n, err := t.conn.Write(clientPreface)
/*************省略***********/
}
调用grpc.Dial()之后,client和server的连接已经建立起来了,此时需初始化客户端对象:
c := pb.NewHelloClient(conn)//创建helloClient实例
func NewHelloClient(cc grpc.ClientConnInterface) HelloClient {
return &helloClient{cc}
}
初始化的helloClient向server发起一次请求msg,即调用hello.pb.go文件中的helloClient.SayHello()方法,其最终还是调用grpc.Invoke()。
r, err := c.SayHello(context.Background(), reqBody)//调用helloClient对象的的SayHello()方法,并传入参数
func (c *helloClient) SayHello(ctx context.Context, in *Request, opts ...grpc.CallOption) (*Reply, error) {
out := new(Reply)
err := c.cc.Invoke(ctx, "/proto.Hello/SayHello", in, out, opts...)//
if err != nil {
return nil, err
}
return out, nil
}
grpc.Invoke()最终调用的是invoke()方法,invoke()表示一次请求过程,包括:
新建clientStream对象,标记调用“/proto.Hello/SayHello” Method,监听用户是否关闭此连接
发送请求SendMsg(),底层调用http2client.Write方法发送req。底层调用函数依次为clientStream.SendMsg()->csAttempt.sendMsg()->http2Client.Write()
接收响应数据RecvMsg(),底层调用函数关系依次为RecvMsg()->recvMsg()->recv()->Stream.Read()->io.ReadFull()->io.ReadAtLeast() ->transportReader.Read()->recvBufferReader.Read()->recvBufferReader.readClient()。readClient从接收数据channel里读取数据。接收数据channel数据依赖http2协议写入。
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
/**********************SendMsg**********************/
func (cs *clientStream) SendMsg(m interface{}) (err error) {
err := a.sendMsg(m, hdr, payload, data)
}
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams})
}
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
}
return t.controlBuf.put(df)
}
/**********************RecvMsg**********************/
func (cs *clientStream) RecvMsg(m interface{}) error {
return a.recvMsg(m, recvInfo)
}
func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
select {
case <-r.ctxDone:
m := <-r.recv.get()
return r.readAdditional(m, p)
case m := <-r.recv.get():
return r.readAdditional(m, p)
}
}
//从接收数据channel中读取数据
func (b *recvBuffer) get() <-chan recvMsg {
return b.c
}
3、服务端处理逻辑
grpc_server.go文件开启服务监听,接收并处理客户端发送的请求
s.Serve(listen)//阻塞监听客户端请求
func (s *Server) Serve(lis net.Listener) error {
for {
rawConn, err := lis.Accept()
go func() {
s.handleRawConn(rawConn)
}()
}
}
接下来我们看看server端如何处理客户端发送的请求。调用Server.handleRawConn完成客户端身份认证并进行stream处理。stream处理主要进行handler函数匹配并发送response。
func (s *Server) handleRawConn(rawConn net.Conn) {
//身份认证并完成握手连接
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
//stream处理
go func() {
s.serveStreams(st)
}()
}
func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
}
tr := trace.New("grpc.Recv."+methodFamily(method), method)
return trace.NewContext(ctx, tr)
})
wg.Wait()
}
// HandleStreams receives incoming streams using the given handler.
func (ht *serverHandlerTransport) HandleStreams() {
handleStream()
ht.runStream()
}
HandleStreams调用handleStream()
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
s.processUnaryRPC(t, stream, srv, md, trInfo)
}
func (s *Server) processUnaryRPC(){
调用处理函数
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
发送处理数据
err := s.sendResponse(t, stream, reply, cp, opts, comp)
}
//sendResponse()调用Write()构建发送Msg
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
return ht.do(func() {
ht.writeCommonHeaders(s)
ht.rw.Write(hdr)
ht.rw.Write(data)
ht.rw.(http.Flusher).Flush()
})
}
总结:gRPC提供一套server/client模型通信机制,底层通信依赖http2。样例中server与client端通信依赖protobuf文件,封装http协议客户端与服务端信息交互机制,client端最终调用http2.Write()发送数据,server端接收request,匹配handler函数,发送response结果给client。
下一篇,将对HTTP协议server端与client源码进行分析。