vlambda博客
学习文章列表

gRPC四种类型示例分析【知识笔记】

目录

一、前言
二、ProtoBuf定义
三、代码结构
1.服务端
2.客户端
四、交互走查
1.简单gRPC交互(UNARY)
2.服务端到客户端流式交互
3.客户端到服务端流式交互
4.双向流式RPC
五、系列文章

一、前言

本文分析下gRPC支持类型的示例,Protobuf生成代码详见前面文章“Google Protocol Buffers三两事” 以及 Maven插件使用参见前面文章 “gRPC示例初探”;具体链接见本文结尾系列文章。gRPC提供四种服务类型,分别为:简单RPC、服务端到客户端流式RPC、客户端到服务端流式RPC、双向流式RPC。将“route_guide.proto”拷贝到工程目录,Maven编译时会生成代码。

二、ProtoBuf定义

下面Protobuf定义了gRPC提供的四种服务类型,走查下内容。

// @1 使用proto3语法
syntax = "proto3";
//
@2 生成多个类
option java_multiple_files = true;
//
@3 生成java类所在的包
option java_package = "io.grpc.examples.routeguide";
//
@4 生成外层类类名
option java_outer_classname = "RouteGuideProto";
//
@5 Objective-C类的前缀
option objc_class_prefix = "RTG";
//
@6 .proto包名
package routeguide;
//
@7 定义RPC服务RouteGuide
service RouteGuide {
//
@8 简单RPC接受Point参数返回Feature类型对象
rpc GetFeature(Point) returns (Feature) {}
//
@9 服务端到客户端流式RPC,接受Rectangle对象参数,返回批量Feature数据
rpc ListFeatures(Rectangle) returns (stream Feature) {}
//
@10 客户端到服务端流式RPC,接受批量Point数据,返回RouteSummary类型对象
rpc RecordRoute(stream Point) returns (RouteSummary) {}
//
@11 双向流式RPC,接受批量RouteNote类型数据,返回批量RouteNote类型数据
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message Rectangle {
Point lo = 1;
Point hi = 2;
}
message Feature {
string name = 1;
Point location = 2;
}
message FeatureDatabase {
repeated Feature feature = 1;
}
message RouteNote {
Point location = 1;
string message = 2;
}
message RouteSummary {
int32 point_count = 1;
int32 feature_count = 2;
int32 distance = 3;
int32 elapsed_time = 4;
}

三、代码结构

编译工具生成了Protobuf定义的Message对应的类和Builder类、gRPC服务端对外提供的接口、客户端调用服务端的存根。

1.服务端

启动gRPC Server

public static void main(String[] args) throws Exception {
RouteGuideServer server = new RouteGuideServer(8980);
server.start();
server.blockUntilShutdown();
}
// @1 启动Server监听处理客户端请求

public RouteGuideServer(int port, URL featureFile) throws IOException {
this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
}
// @2 将数据从文件route_guide_db.json读出

public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
this.port = port;
server = serverBuilder.addService(new RouteGuideService(features))
.build();
}
// @3 将自定义实现的RouteGuideService注册到Server

实现gRPC服务接口

编译工具生成了gRPC服务端对外提供的接口,我们使用时需要实现该接口即可,即实际的Server端处理逻辑。

private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
}
// @1 RouteGuideGrpc.RouteGuideImplBase由编译器生成Server端接口类
//
@2 RouteGuideService由用户实现的类处理Server端业务逻辑

小结:在服务端我们需要做实现生成的服务接口,并将该服务实现类注册到gRPC Server中。

2.客户端

构建通道和存根

 public static void main(String[] args) throws InterruptedException {
String target = "localhost:8980";
List<Feature> features;
// @1 读取route_guide_db.json测试数据
features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
// @2 构建与Server的RPC通道
ManagedChannel channel = ManagedChannelBuilder.forTarget(target).usePlaintext().build();
RouteGuideClient client = new RouteGuideClient(channel);
}

public RouteGuideClient(Channel channel) {
// @3 构建同步调用存根
blockingStub = RouteGuideGrpc.newBlockingStub(channel);
// @4 构建异步调用存根
asyncStub = RouteGuideGrpc.newStub(channel);
}
四、交互走查
1 简单gRPC交互(UNARY)

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
client.getFeature(409146138, -746188906);

Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
// @1 使用同步调用存根向服务端发起请求,入参为普通对象
feature = blockingStub.getFeature(request);

System.out.printf("Found feature called %s, at %s, %s!%n",
feature.getName(),
RouteGuideUtil.getLatitude(feature.getLocation()),
RouteGuideUtil.getLongitude(feature.getLocation()));

服务端响应

@Override
public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
// @2 通过responseObserver.onNext向客户端发送消息
responseObserver.onNext(checkFeature(request));
// @3 标记服务端响应完成
responseObserver.onCompleted();
}

输出内容

Found feature called Berkshire Valley Management Area Trail, Jefferson, NJ, USA, at 40.9146138, -74.6188906!

备注:使用观察者模式响应客户端请求。

2.服务端到客户端流式交互

客户端调用

 RouteGuideClient client = new RouteGuideClient(channel);
client.listFeatures(400000000, -750000000, 420000000, -730000000);
 public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
Iterator<Feature> features;
// @1 客户端调用同步存根向服务端发起调用
features = blockingStub.listFeatures(request);
for (int i = 1; features.hasNext(); i++) {
Feature feature = features.next();
info("Result #" + i + ": {0}", feature);
}
}

服务端响应

public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
// ...
for (Feature feature : features) {
if (!RouteGuideUtil.exists(feature)) {
continue;
}
int lat = feature.getLocation().getLatitude();
int lon = feature.getLocation().getLongitude();
if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
// @2 循环调用responseObserver.onNext向客户端回调发送数据
responseObserver.onNext(feature);
}
}
// @3 标记服务端响应完成
responseObserver.onCompleted();
}

输出内容

Result #62: name: "3387 Richmond Terrace, Staten Island, NY 10303, USA"
location {
latitude: 406411633
longitude: -741722051
}
Result #63: name: "261 Van Sickle Road, Goshen, NY 10924, USA"
location {
latitude: 413069058
longitude: -744597778
}
//...

备注:服务端到客户端流式交互。即:SERVER_STREAMING。客户端通过存根发起RPC调用,由服务端多次调用onNext回调客户端完成响应。

3.客户端到服务端流式交互

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
client.recordRoute(features, 10);
public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
info("*** RecordRoute");
final CountDownLatch finishLatch = new CountDownLatch(1);
// @1 创建responseObserver用于服务端回调客户端
StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
@Override
// @2 响应服务端responseObserver.onNext回调
public void onNext(RouteSummary summary) {
// print
}
@Override
// @3 响应服务端responseObserver.onError回调
public void onError(Throwable t) {
// print
}
@Override
// @4 响应服务端responseObserver.onCompleted的回调
public void onCompleted() {
// print
finishLatch.countDown();
}
};
// @5 通过异步存根发起调用,参数为响应观察者responseObserver
StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
try {
for (int i = 0; i < numPoints; ++i) {
int index = random.nextInt(features.size());
Point point = features.get(index).getLocation();
// print
// @6 多次调用requestObserver.onNext向服务端写入数据
requestObserver.onNext(point);
Thread.sleep(random.nextInt(1000) + 500);
if (finishLatch.getCount() == 0) {
return;
}
}
} catch (RuntimeException e) {
requestObserver.onError(e);
throw e;
}
// @7 标记客户端写入结束
requestObserver.onCompleted();
}

服务端响应

@Override
public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
// @1 构造观察者与客户端交互
return new StreamObserver<Point>() {
int pointCount;
int featureCount;
int distance;
Point previous;
final long startTime = System.nanoTime();

@Override
// @2 响应客户端
public void onNext(Point point) {
pointCount++;
if (RouteGuideUtil.exists(checkFeature(point))) {
featureCount++;
}
if (previous != null) {
distance += calcDistance(previous, point);
}
previous = point;
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "recordRoute cancelled");
}

@Override
// @3 在客户端调用requestObserver.onCompleted()时触发,标记服务端处理完成
public void onCompleted() {
long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
// @4 回调客户端responseObserver.onNext
responseObserver.onNext(RouteSummary.newBuilder()
.setPointCount(pointCount)
.setFeatureCount(featureCount).setDistance(distance)
.setElapsedTime((int) seconds).build());
// @5 回调客户端responseObserver.onCompleted标记完成
responseObserver.onCompleted();
}
};
}

输出内容

*** RecordRoute
Visiting point 40.213, -74.361
Visiting point 41.478, -74.062
Visiting point 40.915, -74.619
Visiting point 40.213, -74.361
Visiting point 40.408, -74.612
Visiting point 40.431, -74.028
Visiting point 40.964, -74.602
Visiting point 40.007, -74.679
Visiting point 41.268, -74.261
Visiting point 40.812, -74.4
Finished trip with 10 points. Passed 5 features. Travelled 761,415 meters. It took 110 seconds.
Finished RecordRoute

小结:客户端到服务端流式交互,即:CLIENT_STREAMING。客户端由异步存根asyncStub发起调用,参数为“responseObserver”;服务端通过onNext响应客户端请求,在客户端触发写入结束响应onCompleted后,服务端onCompleted被触发,调用响应观察者“responseObserver”回调到客户端完成结束操作。

4.双向流式RPC

客户端调用

RouteGuideClient client = new RouteGuideClient(channel);
CountDownLatch finishLatch = client.routeChat();
public CountDownLatch routeChat() {
final CountDownLatch finishLatch = new CountDownLatch(1);

StreamObserver<RouteNote> requestObserver =

    // @1 使用异步存根asyncStub向服务端发起调用

    // 调用参数为响应观察者即:responseObserver

asyncStub.routeChat(new StreamObserver<RouteNote>() {
@Override
// @2 响应服务端responseObserver回调onNext
public void onNext(RouteNote note) {
// print...
}
@Override
public void onError(Throwable t) {
// print...
finishLatch.countDown();
}
@Override
// @3 响应服务端responseObserver回调onCompleted
public void onCompleted() {
info("Finished RouteChat");
finishLatch.countDown();
}
});

RouteNote[] requests =
{newNote("First message", 0, 0), newNote("Second message", 0, 1),
newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {
// @4 循环调用requestObserver.onNext向服务端写入消息
requestObserver.onNext(request);
}

// @5 标记客户端写入完成
requestObserver.onCompleted();

return finishLatch;
}

服务端响应

@Override
public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
return new StreamObserver<RouteNote>() {
@Override
// @1 当客户端调用requestObserver.onNext触发接受数据
public void onNext(RouteNote note) {
List<RouteNote> notes = getOrCreateNotes(note.getLocation());
for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
// @2 循环调用responseObserver.onNext回调客户端
responseObserver.onNext(prevNote);
}
notes.add(note);
}

@Override
public void onError(Throwable t) {
logger.log(Level.WARNING, "routeChat cancelled");
}

@Override
// @3 客户端调用requestObserver.onCompleted()时触发
public void onCompleted() {
// @4 回调客户端标记完成
responseObserver.onCompleted();
}
};
}

输出内容

信息: *** RouteChat
信息: Sending message "First message" at 0, 0
信息: Sending message "Second message" at 0, 1
信息: Sending message "Third message" at 1, 0
信息: Sending message "Fourth message" at 1, 1
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "First message" at 0, 0
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Second message" at 0, 1
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Third message" at 1, 0
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Got message "Fourth message" at 1, 1
信息: Finished RouteChat

小结:双向流式RPC,即:BIDI_STREAMING。客户端和服务端均通过StreamObserver来交互,客户端发起时传入responseObserver,服务端可以通过responseObserver对客户端进行回调。

五、系列文章





「瓜农老梁  学习同行」