gRPC三种客户端类型实践【Java版】
本文承袭内容,学会了基本的gRPC的基本Demo之后,自然要开始了各类客户端的学习。由于服务端的代码都是由开发写好的,所以作为新手测试来说,我觉得学好客户端的代码优先级更高一些。
书接上文,gRPC客户端有三种实现方式,其实就是从io.grpc.ManagedChannel
创建客户端Stub
的过程。三种方式分别为:newBlockingStub
、newStub
、newFutureStub
。下面通过代码演示来分享三种的区别和优劣。
gRPC客户端目前用起来跟HTTP协议一样,调用方式跟HttpClient调用一样。分成了阻塞、异步和future
,有兴趣可以移步。
服务端
服务端是上期进行改造,主要是增加了响应等待时间和时间信息,方便后面验证不同客户端功能。代码如下:
package com.funtester.grpc;
import com.funtester.frame.SourceCode;
import com.funtester.fungrpc.HelloRequest;
import com.funtester.fungrpc.HelloResponse;
import com.funtester.fungrpc.HelloServiceGrpc;
import com.funtester.utils.Time;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {
private static final Logger logger = LogManager.getLogger(HelloServiceImpl.class);
@Override
public void executeHi(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
HelloResponse response = HelloResponse.newBuilder()
.setMsg("你好 " + request.getName()+ Time.getDate())
.build();
SourceCode.sleep(1.0);
logger.info("用户{}来了",request.getName());
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
newBlockingStub
顾名思义,这个是阻塞调用的gRPC客户端类型,实际使用中跟HTTP接口请求->响应
一样,代码如下:
package com.funtest.grpc
import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import java.util.concurrent.ExecutionException
class BlockClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build()
HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel).withCompression("gzip")
HelloRequest helloRequest = HelloRequest.newBuilder()
.setName("FunTester")
.build()
5.times {
HelloResponse orderResponse = helloServiceBlockingStub.executeHi(helloRequest)
output("收到响应: " + orderResponse.getMsg())
}
managedChannel.shutdown()
}
}
控制台输出:
20:46:04.664 main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
20:46:04.675 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
20:46:06.517 main 收到响应: 你好 FunTester2022-05-09 20:46:05
20:46:07.521 main 收到响应: 你好 FunTester2022-05-09 20:46:06
20:46:08.531 main 收到响应: 你好 FunTester2022-05-09 20:46:07
20:46:09.542 main 收到响应: 你好 FunTester2022-05-09 20:46:08
20:46:10.552 main 收到响应: 你好 FunTester2022-05-09 20:46:09
进程已结束,退出代码0
比较简单,这里不多做介绍了。
newStub
看名字有点猜不出来,这是个纯异步调用客户端。写上去代码可能比较多,但是如果把io.grpc.stub.StreamObserver
对象拆开看就会比较容易懂一些。代码如下:
package com.funtest.grpc
import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.StreamObserver
import java.util.concurrent.ExecutionException
class SyncClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build()
HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel).withCompression("gzip")
HelloRequest helloRequest = HelloRequest.newBuilder()
.setName("FunTester")
.build()
StreamObserver<HelloResponse> helloResponseStreamObserver = new StreamObserver<HelloResponse>() {
@Override
void onNext(HelloResponse value) {
output(value.getMsg())
}
@Override
void onError(Throwable t) {
output(t.getMessage())
}
@Override
void onCompleted() {
}
}
5.times {helloServiceStub.executeHi(helloRequest, helloResponseStreamObserver)}
sleep(2000)
managedChannel.shutdown()
}
}
由于都是异步,所以相当于自动多线程了。控制台输出如下:
20:50:59.053 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
20:51:00.816 grpc-default-executor-4 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-1 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-3 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-0 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-2 你好 FunTester2022-05-09 20:50:59
进程已结束,退出代码0
可以看到,所有请求响应的结果时间都是一样的,说明请求到达服务端时间是一样的。
newFutureStub
这种客户端也是异步的,之所以放在最后将是因为它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用。下面是演示代码:
package com.funtest.grpc
import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import com.google.common.util.concurrent.ListenableFuture
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import java.util.concurrent.ExecutionException
class FutureClient extends SourceCode {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 8080)
.usePlaintext()
.build()
HelloServiceGrpc.HelloServiceFutureStub helloServiceFutureStub = HelloServiceGrpc.newFutureStub(managedChannel).withCompression("gzip")
HelloRequest helloRequest = HelloRequest.newBuilder()
.setName("FunTester")
.build()
//同步客户端的使用方式
5.times {
ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest)
HelloResponse helloResponse = helloResponseListenableFuture.get()
output(helloResponse.getMsg())
}
//异步客户端的使用方式
def res = []
5.times {
ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest)
res << helloResponseListenableFuture
}
res.each {
output(it.get().getMsg())
}
managedChannel.shutdown()
}
}
控制台输出:
21:03:07.312 main
###### # # # # ####### ###### ##### ####### ###### #####
# # # ## # # # # # # # #
#### # # # # # # #### ##### # #### #####
# # # # # # # # # # # # #
# ##### # # # ###### ##### # ###### # #
21:03:09.226 main 你好 FunTester2022-05-09 21:03:08
21:03:10.232 main 你好 FunTester2022-05-09 21:03:09
21:03:11.238 main 你好 FunTester2022-05-09 21:03:10
21:03:12.247 main 你好 FunTester2022-05-09 21:03:11
21:03:13.255 main 你好 FunTester2022-05-09 21:03:12
21:03:14.262 main 你好 FunTester2022-05-09 21:03:13
21:03:14.281 main 你好 FunTester2022-05-09 21:03:13
21:03:14.281 main 你好 FunTester2022-05-09 21:03:13
21:03:14.282 main 你好 FunTester2022-05-09 21:03:13
21:03:14.282 main 你好 FunTester2022-05-09 21:03:13
进程已结束,退出代码0
可以看到,前面五个请求是串行的,后面的五个请求是并行的。在实际工作中,使用到异步调用又要处理结果的地方也是这种类型使用较多,而使用Java的线程同步类,往往比较麻烦也不够优雅。