gRPC 的 4 种基础通信模式
题图 | from freepik
本文将讨论 gRPC 应用程序的 4 种基础通信模式:一元 RPC、服务器端流 RPC、客户端流 RPC 以及双向流 RPC。在这个过程中,我们会使用一些真实用例来展示每种模式,使用 gRPC IDL 进行服务定义,并使用 Go 语言来实现服务和客户端。
1. 一元RPC模式
我们从最简单的 RPC 风格开始讨论 gRPC 通信模式。一元 RPC 模式也被称为简单 RPC 模式。在该模式中,当客户端调用服务器端的远程方法时,客户端发送请求至服务器端并获得一个响应,与响应一起发送的还有状态细节以及 trailer 元数据。接下来看一个真实的用例,来进一步了解一元 RPC 模式。
假设需要为基于 gRPC 的在线零售应用程序构建 OrderManagement 服务,并在该服务中实现 getOrder 方法。借助该方法,客户端可以通过订单 ID 检索已有的订单。如图1 所示,客户端发送一个带有订单 ID 的请求,服务器端给出响应,响应中包含订单的信息。因此,它遵循一元 RPC 模式。
图1:一元 RPC 模式
下面来实现这种模式。第一步就是为 OrderManagement 服务及其 getOrder 方法创建服务定义。如代码清单1 所示,可以使用 protocol buffers 进行服务定义,getOrder 远程方法接受一个订单 ID 的请求,并且会给出一个包含 Order 消息的响应。在本用例中,Order 消息具有描述订单所需的结构。
代码清单1
OrderManagement服务定义,服务中的getOrder方法遵循一元 RPC 模式
syntax = "proto3";
import "google/protobuf/wrappers.proto"; ➊
package ecommerce;
service OrderManagement {
rpc getOrder(google.protobuf.StringValue) returns (Order); ➋
}
message Order { ➌
string id = 1;
repeated string items = 2; ➍
string description = 3;
float price = 4;
string destination = 5;
}
❶ 导入这个包,从而使用常见的类型,如 StringValue。
❷ 检索订单的远程方法。
❸ 定义 Order 类型。
❹ 使用 repeated 表明这个字段在消息中可以重复出现任意次,包括 0 次。在这里,一条订单消息可以有任意数量的条目。
然后,借助 gRPC 服务定义的 proto 文件,就可以生成服务器端骨架代码并实现 GetOrder 方法的逻辑了。代码清单2 展示了 OrderManagement 服务的 Go 实现。作为 GetOrder 方法的输入,单个订单 ID(String)用来组成请求,这样做可以很容易地在服务器端找到订单并以 Order 消息(Order 结构体)的形式进行响应。Order 消息可以和 nil 错误一起返回,从而告诉 gRPC,我们已经处理完 RPC,可以将 Order 返回到客户端了。
代码清单2 使用 Go 语言编写的
OrderManagement服务的GetOrder方法实现
// server/main.go
func (s *server) GetOrder(ctx context.Context,
orderId *wrapper.StringValue) (*pb.Order, error) {
// 服务实现
ord := orderMap[orderId.Value]
return &ord, nil
}
现在来实现客户端的逻辑,从而远程调用 GetOrder 方法。与服务器端的实现一样,可以为自己喜欢的语言生成代码来创建客户端存根,然后使用该存根调用服务,代码清单3 使用 Go gRPC 客户端调用 OrderManagement 服务。当然,首先要创建到服务器端的连接并初始化调用服务的客户端存根。然后,就可以调用客户端存根的 GetOrder 方法,从而实现对远程方法的调用。这时会得到一个 Order 消息作为响应,其中包含服务定义中使用 protocol buffers 所定义的订单信息。
代码清单3 使用 Go 语言调用远程
GetOrder方法的客户端实现
// 建立到服务器端的连接.
...
orderMgtClient := pb.NewOrderManagementClient(conn)
...
// 获取订单
retrievedOrder , err := orderMgtClient.GetOrder(ctx,
&wrapper.StringValue{Value: "106"})
log.Print("GetOrder Response -> : ", retrievedOrder)
这种一元 RPC 模式非常容易实现,适用于大多数进程间通信用例。在多种语言间,实现方式都是非常类似的,本文的示例代码仓库提供了 Go 和 Java 的源代码。
现在,我们已经对一元 RPC 模式有了大致的了解,接下来看一下服务器端流 RPC 模式。
2. 服务器端流RPC模式
在一元 RPC 模式中,gRPC 服务器端和 gRPC 客户端在通信时始终只有一个请求和一个响应。在服务器端流 RPC 模式中,服务器端在接收到客户端的请求消息后,会发回一个响应的序列。这种多个响应所组成的序列也被称为“流”。在将所有的服务器端响应发送完毕之后,服务器端会以 trailer 元数据的形式将其状态发送给客户端,从而标记流的结束。
下面通过一个真实的用例来进一步了解服务器端流。在 OrderManagement 服务中,假设需要实现一个订单搜索功能,利用该功能,只要提供一个搜索词就能得到匹配的结果,如图2 所示。OrderManagement 服务不会将所有匹配的订单一次性地发送给客户端,而是在找到匹配的订单时,逐步将其发送出去。这意味着当订单服务的客户端发出一个请求之后,会接收到多条响应消息。
图2:服务器端流 RPC 模式
现在,在 OrderManagement 服务的 gRPC 服务定义中新增 searchOrders 方法。如代码清单4 所示,searchOrders 方法定义与代码清单 3-1 中的 getOrder 方法非常类似,但是在服务定义的 proto 文件中,我们通过使用 returns (stream Order) 将返回参数指定为订单的流。
代码清单4 使用服务器端流 RPC 模式的服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc searchOrders(google.protobuf.StringValue) returns (stream Order); ➊
...
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
➊ 通过返回 Order 消息的 stream 定义服务器端流。
通过服务定义,可以生成服务器端的代码,然后通过实现所生成的接口,就可以为 OrderManagement 服务的 searchOrders 方法构建逻辑了。在代码清单5 所示的 Go 实现中,SearchOrders 方法有两个参数,分别是字符串类型的 searchQuery 和用来写入响应的特殊参数 OrderManagement_SearchOrdersServer。OrderManagement_SearchOrdersServer 是流的引用对象,可以写入多个响应。这里的业务逻辑是找到匹配的订单,并通过流将其依次发送出去。当找到新的订单时,使用流引用对象的 Send(...) 方法将其写入流。一旦所有响应都写到了流中,就可以通过返回 nil 来标记流已经结束,服务器端的状态和其他 trailer 元数据会发送给客户端。
代码清单5 使用 Go 语言编写的
SearchOrders方法的OrderManagement服务实现
func (s *server) SearchOrders(searchQuery *wrappers.StringValue,
stream pb.OrderManagement_SearchOrdersServer) error {
for key, order := range orderMap {
log.Print(key, order)
for _, itemStr := range order.Items {
log.Print(itemStr)
if strings.Contains(
itemStr, searchQuery.Value) { ➊
// 在流中发送匹配的订单
err := stream.Send(&order) ➋
if err != nil {
return fmt.Errorf(
"error sending message to stream : %v",
err) ➌
}
log.Print("Matching Order Found : " + key)
break
}
}
}
return nil
}
❶ 查找匹配的订单。
❷ 通过流发送匹配的订单。
❸ 检查在将消息以流的形式发送给客户端的过程中可能出现的错误。
客户端的远程方法调用和一元 RPC 模式中的非常类似。但是,因为服务器端往流中写入了多个响应,所以这里必须处理多个响应。因此,我们在 gRPC 客户端的 Go 语言实现中使用 Recv 方法从客户端流中检索消息,并且持续检索,直到流结束为止,如代码清单6 所示。
代码清单6 使用 Go 语言编写的
SearchOrders方法的OrderManagement客户端实现
// 建立到服务器端的连接
...
c := pb.NewOrderManagementClient(conn)
...
searchStream, _ := c.SearchOrders(ctx,
&wrapper.StringValue{Value: "Google"}) ➊
for {
searchOrder, err := searchStream.Recv() ➋
if err == io.EOF { ➌
break
}
// 处理可能出现的错误
log.Print("Search Result : ", searchOrder)
}
❶ SearchOrders 方法返回 OrderManagement_SearchOrdersClient 的客户端流,它有一个名为 Recv 的方法。
❷ 调用客户端流的 Recv 方法,逐个检索 Order 响应。
❸ 当发现流结束的时候,Recv 会返回 io.EOF。
下面看一下客户端流 RPC 模式,它恰好与服务器端流 RPC 模式相反。
3. 客户端流RPC模式
在客户端流 RPC 模式中,客户端会发送多个请求给服务器端,而不再是单个请求。服务器端则会发送一个响应给客户端。但是,服务器端不一定要等到从客户端接收到所有消息后才发送响应。基于这样的逻辑,我们可以在接收到流中的一条消息或几条消息之后就发送响应,也可以在读取完流中的所有消息之后再发送响应。
现在进一步扩展 OrderManagement 服务,从而更好地理解客户端流 RPC 模式。假设希望在 OrderManagement 服务中添加新的 updateOrders 方法,从而更新一个订单集合,如图3 所示。在这里,我们想以消息流的形式发送订单列表到服务器端,服务器端会处理这个流并发送一条带有已更新订单状态的消息给客户端。
图3:客户端流 RPC 模式
然后,可以将 updateOrders 方法添加到 OrderManagement 服务的服务定义文件中,如代码清单7 所示。只需使用 stream Order 作为 updateOrders 方法的参数,就能表明 updateOrders 会接收来自客户端的多条消息作为输入。因为服务器端只发送一个响应,所以返回值是单一的字符串消息。
代码清单7 具有客户端流 RPC 功能的服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc updateOrders(stream Order) returns (google.protobuf.StringValue);
...
}
message Order {
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
当更新完服务定义文件之后,就可以生成服务器端和客户端的代码了。在服务器端,需要实现 OrderManagement 服务中所生成的 updateOrders 方法接口。在代码清单8 所示的 Go 实现中,UpdateOrders 方法有一个 OrderManagement_UpdateOrdersServer 参数,它是客户端传入消息流的引用对象。因此,可以通过调用该对象的 Recv 方法来读取消息。根据业务逻辑,可以读取其中一些消息,也可以读取所有的消息。服务只需调用 OrderManagement_UpdateOrdersServer 对象的 SendAndClose 方法就可以发送响应,它同时也标记服务器端消息终结了流。如果要提前停止读取客户端流,那么服务器端应该取消客户端流,这样客户端就知道停止生成消息了。
代码清单8 使用 Go 语言编写的
UpdateOrders方法的OrderManagement服务实现
func (s *server) UpdateOrders(stream pb.OrderManagement_UpdateOrdersServer) error {
ordersStr := "Updated Order IDs : "
for {
order, err := stream.Recv() ➊
if err == io.EOF { ➋
// 完成读取订单流
return stream.SendAndClose(
&wrapper.StringValue{Value: "Orders processed "
+ ordersStr})
}
// 更新订单
orderMap[order.Id] = *order
log.Printf("Order ID ", order.Id, ": Updated")
ordersStr += order.Id + ", "
}
}
❶ 从客户端流中读取消息。
❷ 检查流是否已经结束。
下面来看这个客户端流用例的客户端实现。如代码清单9 中的 Go 实现所示,客户端可以通过客户端流引用,借助 updateStream.Send 方法发送多条消息。一旦所有消息都以流的形式发送出去,客户端就可以将流标记为已完成,并接收来自服务器端的响应。这是通过流引用的 CloseAndRecv 方法实现的。
代码清单9 使用 Go 语言编写的
UpdateOrders方法的OrderManagement客户端实现
// 建立到服务器端的连接
...
c := pb.NewOrderManagementClient(conn)
...
updateStream, err := client.UpdateOrders(ctx) ➊
if err != nil { ➋
log.Fatalf("%v.UpdateOrders(_) = _, %v", client, err)
}
// 更新订单1
if err := updateStream.Send(&updOrder1); err != nil { ➌
log.Fatalf("%v.Send(%v) = %v",
updateStream, updOrder1, err) ➍
}
// 更新订单2
if err := updateStream.Send(&updOrder2); err != nil {
log.Fatalf("%v.Send(%v) = %v",
updateStream, updOrder2, err)
}
// 更新订单3
if err := updateStream.Send(&updOrder3); err != nil {
log.Fatalf("%v.Send(%v) = %v",
updateStream, updOrder3, err)
}
updateRes, err := updateStream.CloseAndRecv() ➎
if err != nil {
log.Fatalf("%v.CloseAndRecv() got error %v, want %v",
updateStream, err, nil)
}
log.Printf("Update Orders Res : %s", updateRes)
❶ 调用 UpdateOrders 远程方法。
❷ 处理与 UpdateOrders 相关的错误。
❸ 通过客户端流发送订单更新的请求。
❹ 处理在发送消息到流时发生的错误。
❺ 关闭流并接收响应。
当调用这个方法后,会收到服务的响应消息。现在,我们对服务器端流 RPC 模式和客户端流 RPC 模式都有了非常好的了解。接下来将介绍双向流 RPC 模式,它是前面讨论的不同 RPC 风格的一种组合。
4. 双向流RPC模式
图4:双向流 RPC 模式
可以看到,这个业务用例的关键步骤如下所示。
客户端应用程序通过建立与服务器端的连接并发送调用元数据(头信息)初始化业务用例。
一旦连接成功建立,客户端应用程序就发送连续的订单 ID 集合,这些订单需要由
OrderManagement服务进行处理。每个订单 ID 以独立的 gRPC 消息的形式发送至服务器端。
服务会处理给定订单 ID 所对应的每个订单,并根据订单的投递位置将它们组织到发货组合中。
每个发货组合可能会包含多个订单,它们应该被投递到相同的目的地。
订单是成批处理的。当达到指定的批次大小时,当前创建的所有发货组合都会被发送至客户端。
假设流中有 4 个订单,其中有 2 个订单要发送至位置
X,另外两个要发送至位置Y,则可以将其表示为X、Y、X、Y。如果批次大小为 3,那么所创建的订单发货组合会是[X, X]、[Y]和[Y]。这些发货组合也会以流的形式发送至客户端。
这个业务用例的核心理念就是一旦调用 RPC 方法,那么无论是客户端还是服务器端,都可以在任意时间发送消息。这也包括来自任意一端的流结束标记。
下面看一下上述用例的服务定义。如代码清单10 所示,可以定义一个 processOrders 方法,该方法接受一个字符串流作为方法参数,代表了订单流 ID 并且以 CombinedShipment 流作为方法的返回值。因此,通过将方法参数和返回参数均声明为 stream,可以定义双向流的 RPC 方法。发货组合的消息也是通过服务定义声明的,它包含了订单元素的列表。
代码清单10 具有双向流 RPC 功能的服务定义
syntax = "proto3";
import "google/protobuf/wrappers.proto";
package ecommerce;
service OrderManagement {
...
rpc processOrders(stream google.protobuf.StringValue)
returns (stream CombinedShipment); ➊
}
message Order { ➋
string id = 1;
repeated string items = 2;
string description = 3;
float price = 4;
string destination = 5;
}
message CombinedShipment { ➌
string id = 1;
string status = 2;
repeated Order ordersList = 3;
}
❶ 在双向流 RPC 模式中,将方法参数和返回参数均声明为 stream。
❷ Order 消息的结构。
❸ CombinedShipment 消息的结构。
接下来,就可以根据更新后的服务定义生成服务器端的代码了。服务应该实现 OrderManagement 服务中的 processOrders 方法。如代码清单11 所示,在 Go 实现中,ProcessOrders 方法有一个 OrderManagement_ProcessOrdersServer 参数,它是客户端和服务器端之间消息流的对象引用。借助这个流对象,服务器端可以读取客户端以流的方式发送的消息,也能写入服务器端的流消息并返回给客户端。传入的消息流可以通过该引用对象的 Recv 方法来读取。在 ProcessOrders 方法中,服务可在持续读取传入消息流的同时,使用 Send 方法将消息写入同一个流中。
代码清单11 使用 Go 语言编写的
ProcessOrders方法的OrderManagement服务实现
func (s *server) ProcessOrders(
stream pb.OrderManagement_ProcessOrdersServer) error {
...
for {
orderId, err := stream.Recv() ➊
if err == io.EOF { ➋
...
for _, comb := range combinedShipmentMap {
stream.Send(&comb) ➌
}
return nil ➍
}
if err != nil {
return err
}
// 基于目的地位置,
// 将订单组织到发货组合中的逻辑
...
//
if batchMarker == orderBatchSize { ➎
// 将组合后的订单以流的形式分批发送至客户端
for _, comb := range combinedShipmentMap {
// 将发货组合发送到客户端
stream.Send(&comb) ➏
}
batchMarker = 0
combinedShipmentMap = make(
map[string]pb.CombinedShipment)
} else {
batchMarker++
}
}
}
❶ 从传入的流中读取订单 ID。
❷ 持续读取,直到流结束为止。
❸ 当流结束时,将所有剩余的发货组合发送给客户端。
❹ 通过返回 nil 标记服务器端流已经结束。
❺ 按批次处理订单。当达到该批次的规模时,将所有已创建的发货组合以流的形式发送给客户端。
❻ 将发货组合写入流中。
这里是基于订单 ID 来处理传入的订单的,当创建新的发货组合后,服务会将其写入相同的流中。这与客户端流 RPC 模式不同,当时服务通过 SendAndClose 方法写入流并将其关闭。当发现客户端流已经结束时,发送 nil 标记服务器端流的结束。
如代码清单12 所示,客户端实现与之前的示例非常相似。当客户端通过 OrderManagement 对象调用 ProcessOrders 方法时,它会得到一个对流的引用(streamProcOrder),这个引用可以用来发送消息到服务器端,也能读取来自服务器端的消息。
代码清单12 使用 Go 语言编写的
ProcessOrders方法的OrderManagement客户端实现
// 处理订单
streamProcOrder, _ := c.ProcessOrders(ctx) ➊
if err := streamProcOrder.Send(
&wrapper.StringValue{Value:"102"}); err != nil { ➋
log.Fatalf("%v.Send(%v) = %v", client, "102", err)
}
if err := streamProcOrder.Send(
&wrapper.StringValue{Value:"103"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "103", err)
}
if err := streamProcOrder.Send(
&wrapper.StringValue{Value:"104"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "104", err)
}
channel := make(chan struct{}) ➌
go asncClientBidirectionalRPC(streamProcOrder, channel) ➍
time.Sleep(time.Millisecond * 1000) ➎
if err := streamProcOrder.Send(
&wrapper.StringValue{Value:"101"}); err != nil {
log.Fatalf("%v.Send(%v) = %v", client, "101", err)
}
if err := streamProcOrder.CloseSend(); err != nil { ➏
log.Fatal(err)
}
<- channel
func asncClientBidirectionalRPC (
streamProcOrder pb.OrderManagement_ProcessOrdersClient,
c chan struct{}) {
for {
combinedShipment, errProcOrder := streamProcOrder.Recv() ➐
if errProcOrder == io.EOF { ➑
break
}
log.Printf("Combined shipment : ", combinedShipment.OrdersList)
}
<-c
}
❶ 调用远程方法并获取流引用,以便在客户端写入和读取。
❷ 向服务发送消息。
❸ 创建 Goroutines 所使用的通道。
❹ 使用 Goroutines 调用函数,以便并行读取来自服务的消息。
❺ 模拟向服务发送消息的延迟。
❻ 为客户端流标记流的结束(订单 ID)。
❼ 在客户端读取服务的消息。
❽ 该条件探测流是否已经结束。
客户端可以在任意时间发送消息给服务并关闭流。读取消息也是同样的道理。前面的示例使用了 Go 语言中的 Goroutines,在两个并发线程中执行客户端的消息写入逻辑和消息服务逻辑。
![]()
Goroutines在 Go 语言中,
Goroutines是能够与其他函数或方法并行运行的函数或方法,可以将它们视为轻量级的线程。
客户端可以并发读取和写入同一个流,输入流和输出流可以独立进行操作。这里所展示的是稍微复杂的示例,它展现了双向流 RPC 模式的威力。流的操作完全独立,客户端和服务器端可以按照任意顺序进行读取和写入,理解这一点非常重要。一旦建立连接,客户端和服务器端之间的通信模式就完全取决于客户端和服务器端本身。
目前我们已经讨论了所有可能的通信模式,可以使用它们实现基于 gRPC 的应用程序之间的交互。至于具体选择哪种通信模式,并没有硬性的规定,但是最好的办法就是分析业务用例,并据此选择最合适的模式。
在结束关于 gRPC 通信模式的讨论之前,还有一个重要的方面需要了解,即 gRPC 是如何应用于微服务通信的。
5. 使用gRPC实现微服务通信
gRPC 的主要用途之一就是实现微服务以及服务之间的通信。在微服务的服务间通信中,gRPC 会与其他通信协议一同使用,并且 gRPC 服务通常会实现为多语言服务(由不同的语言实现)。为了进一步理解该技术,下面来看在线零售系统这样一个真实的场景,如图5 所示,它是对前述内容的扩展。
图5:使用 gRPC 和其他协议的通用微服务部署模式
该场景中有许多微服务,每个微服务都面向在线零售系统的特定业务能力。有一些服务的实现形式是 gRPC 服务,如 Product 服务;另外还有一些组合服务,如 Catalog 服务,它会调用底层的服务来构建其业务能力。大多数同步消息可以使用 gRPC 来传递。如果有特定的异步消息场景,可能需要持久化消息,那么就可以使用事件代理或消息代理,如 Kafka、Active MQ、RabbitMQ 和 NATS。当需要将特定的业务功能暴露到外部时,可以使用传统的基于 REST 或 OpenAPI 的服务或者 GraphQL 服务。因此,Catalog 和 Checkout 等服务消费基于 gRPC 的后端服务,同时暴露基于 REST 或 GraphQL 的外部接口。
在大多数实际用例中,这些面向外部的服务是通过 API 网关暴露的。这里可以应用各种非功能性的能力,如安全性、节流、版本化等。大多数这样的 API 使用像 REST 或 GraphQL 这样的协议,但还有一种可能,这种情况不太常见,那就是只要 API 网关支持暴露 gRPC 接口,gRPC 就可以作为对外的接口。API 网关实现了横切性的功能,如认证、日志、版本化、节流和负载均衡。通过组合使用 API 网关与 gRPC API,可以将这些功能部署到核心 gRPC 服务之外。这种架构还有另外一个重要方面,那就是可以使用多种编程语言,但共享相同的服务契约,比如通过相同的 gRPC 服务定义来生成代码。这样一来,便可以根据服务的业务能力来选择适当的实现技术。
6. 小结
gRPC 提供了一组不同的 RPC 通信风格,用于在基于 gRPC 的应用程序之间构建进程间通信。本文探讨了 4 种主要的通信模式,其中一元 RPC 模式是最基本的一种模式,它是一种非常简单的请求–响应式 RPC;服务器端流 RPC 模式可以在第一次调用远程方法后从服务向消费者发送多条消息;客户端流 RPC 模式可以从客户端向服务发送多条消息;双向流 RPC 模式有一点复杂,其中流的操作是完全独立的,客户端和服务器端可以按照任意顺序进行读取和写入。另外,本文深入研究了如何通过一些真实的用例来实现这些模式。
本文内容对实现任何 gRPC 用例都非常有用,你可以根据实际情况选择最合适的通信模式。
7. 新书推荐
本文引自《 gRPC与云原生应用开发:以Go和Java为例》,刚上架的新书,欢迎有需要的朋友关注。
8. 留言福利
小伙伴们,今天是开工第一天,请大家来聊聊开工后最想读的书是哪一本,理由是什么?
留言告诉我们,随机选取 3 位读者,可从图灵 2021 年出版的新书中任选一本,作为你的开工礼物。
统计截止时间:2021 年 2 月 22 日 12:00。
