vlambda博客
学习文章列表

grpc应用之二 gRPC使用

 安装grpc

在上一个例子

grpc应用之一 使用protobuf
https://realjf.io/grpc/protobuf-quick-start

go get -u google.golang.org/grpc@latest



gRPC的四种调用方式

  • 一元RPC

  • 服务端流式RPC

  • 客户端流式RPC

  • 双向流式RPC


修改server目录下的server.go文件,其代码如下:


package main

import ( "flag" // 用pb别名来引用proto里定义的类型方法 pb "github.com/realjf/grpc-demo/proto")

var port string

func init() { flag.StringVar(&port, "p", "8000", "启动端口号") flag.Parse()}


每次在proto文件中定义RPC方法的proto时,需要重新在根目录下运行如下命令重新编译生成语句:


protoc --go_out=plugins=grpc:. ./proto/*.proto




一元RPC

一元 RPC,也就是是单次 RPC 调用,简单来讲就是客户端发起一次普通的 RPC 请求,响应,是最基础的调用类型,也是最常用的方式,


proto文件


rpc SayHello(HelloRequest) returns (HelloRespnose) {};



server端

server.go代码如下:


package main

import ( "context" "flag" "net"

pb "github.com/realjf/grpc-demo/proto" "google.golang.org/grpc")

var port string

func init() { flag.StringVar(&port, "p", "8000", "启动端口号") flag.Parse()}

type GreeterServer struct{}

func (s *GreeterServer) SayHello(ctx context.Context, r *pb.HelloRequest) (*pb.HelloResponse, error) { return &pb.HelloResponse{Code: 200, Message: "success"}, nil}

func main() { server := grpc.NewServer() pb.RegisterGreeterServer(server, &GreeterServer{}) lis, _ := net.Listen("tcp", ":"+port) server.Serve(lis)}




  •  创建gRPC Server对象,你可以理解为它是Server端的抽象对象

  • 将GreeterServer注册到gRPC Server的内部注册中心,这样可以在接收到请求时,通过内部的“服务发现”,发现该服务端接口并转移进行逻辑处理

  •  创建Listen,监听TCP端口

  • gRPC Server开始lis.Accept,直到Stop或GracefulStop


client

client/client.go的代码如下:



package main

import ( "context" "flag" "log"

pb "github.com/realjf/grpc-demo/proto" "google.golang.org/grpc")

var port string

func init() { flag.StringVar(&port, "p", "8000", "启动端口号") flag.Parse()}

func main() { conn, _ := grpc.Dial(":"+port, grpc.WithInsecure()) defer conn.Close()

client := pb.NewGreeterClient(conn) _ = SayHello(client)}

func SayHello(client pb.GreeterClient) error { resp, _ := client.SayHello(context.Background(), &pb.HelloRequest{Name: "realjf"}) log.Printf("client.SayHello resp: %s", resp.Message) return nil}



  •  创建与给定服务端的连接句柄

  • 创建Greeter的客户端对象

  • 发送RPC请求,等待同步响应,得到回调后返回响应结果


现在可以运行一元RPC的服务端和客户端查看结果


服务端流式RPC

服务器端流式 RPC,也就是是单向流,并代指 Server 为 Stream,Client 为普通的一元 RPC 请求。


简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。


proto文件


新增如下内容:


rpc SayList(HelloRequest) returns (stream HelloResponse) {};


server端



func (s *GreeterServer) SayList(r *pb.HelloRequest, stream pb.Greeter_SayListServer) error { for n := 0; n <= 6; n++ { _ = stream.Send(&pb.HelloResponse{Code: 200, Message: "hello.list"}) } return nil}


在 Server 端,主要留意 stream.Send 方法,是 protoc 在生成时,根据定义生成了各式各样符合标准的接口方法。最终再统一调度内部的 SendMsg 方法,该方法涉及以下过程:


  •  消息体序列化

  •  压缩序列化后的消息体

  •  对正在传输的消息体增加5个字节的header(标志位)

  •  判断压缩+序列化后的消息体的总字节长度是否大于预设的maxSendMessageSize,若超出则提示错误。

  •  写入给流的数据集


client端



func SayList(client pb.GreeterClient, r *pb.HelloRequest) error { stream, _ := client.SayList(context.Background(), r) for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err } log.Printf("resp: %v", resp) } return nil}



在client端,stream.Recv()方法是对ClientStream.RecvMsg方法的封装,而RecvMsg方法会从流中读取完整的gRPC消息体,


  •  RecvMsg是阻塞等待的

  •  RecvMsg当流成功/结束(调用了Close)时,会返回io.EOF

  •  RecvMsg当流出现任何错误时,流会被终止,错误信息会包含RPC错误码,而在RecvMsg中可能出现如下错误:

    •    io.EOF、io.ErrUnexpectedEOF

    •   transport.ConnectionError

    •    google.golang.org/grpc/codes(grpc的预定义错误码)

 

需要注意的是,默认的 MaxReceiveMessageSize 值为 1024 *1024* 4,若有特别需求,可以适当调整。


客户端流式RPC

客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端,


proto文件


rpc SayRecord(stream HelloRequest) returns (HelloResponse) {};




server端


func (s *GreeterServer) SayRecord(stream pb.Greeter_SayRecordServer) error { for { resp, err := stream.Recv() if err == io.EOF { return stream.SendAndClose(&pb.HelloResponse{Message: "say.record"}) } if err != nil { return err } log.Printf("resp: %v", resp) } return nil}




你可以发现在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF (流关闭) 后,需要通过 stream.SendAndClose 方法将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv。


client端


func SayRecord(client pb.GreeterClient, r *pb.HelloRequest) error { stream, _ := client.SayRecord(context.Background()) for n := 0; n < 6; n++ { _ = stream.Send(r) } resp, _ := stream.CloseAndRecv()

log.Printf("resp err: %v", resp) return nil}


在 Server 端的 stream.CloseAndRecv,与 Client 端 stream.SendAndClose 是配套使用的方法。


双向流式RPC

双向流式 RPC,顾名思义是双向流,由客户端以流式的方式发起请求,服务端同样以流式的方式响应请求。


首个请求一定是 Client 发起,但具体交互方式(谁先谁后、一次发多少、响应多少、什么时候关闭)根据程序编写的方式来确定(可以结合协程)。


proto文件


rpc SayRoute(stream HelloRequest) returns (stream HelloResponse) {};



server端


func (s *GreeterServer) SayRoute(stream pb.Greeter_SayRouteServer) error { n := 0 for { _ = stream.Send(&pb.HelloResponse{Code: 200, Message: "say.route"})

resp, err := stream.Recv() if err == io.EOF { return nil } if err != nil { return err }

n++ log.Printf("resp: %v", resp) }}



 client端


func SayRoute(client pb.GreeterClient, r *pb.HelloRequest) error { stream, _ := client.SayRoute(context.Background()) for n := 0; n <= 6; n++ { _ = stream.Send(r) resp, err := stream.Recv() if err == io.EOF { break } if err != nil { return err }

log.Printf("resp err: %v", resp) }

_ = stream.CloseSend() return nil}