vlambda博客
学习文章列表

gRPC三种客户端类型实践【Java版】

本文承袭内容,学会了基本的gRPC的基本Demo之后,自然要开始了各类客户端的学习。由于服务端的代码都是由开发写好的,所以作为新手测试来说,我觉得学好客户端的代码优先级更高一些。

书接上文,gRPC客户端有三种实现方式,其实就是从io.grpc.ManagedChannel创建客户端Stub的过程。三种方式分别为:newBlockingStubnewStubnewFutureStub。下面通过代码演示来分享三种的区别和优劣。

gRPC客户端目前用起来跟HTTP协议一样,调用方式跟HttpClient调用一样。分成了阻塞、异步和future,有兴趣可以移步。

服务端

服务端是上期进行改造,主要是增加了响应等待时间和时间信息,方便后面验证不同客户端功能。代码如下:

package com.funtester.grpc;

import com.funtester.frame.SourceCode;
import com.funtester.fungrpc.HelloRequest;
import com.funtester.fungrpc.HelloResponse;
import com.funtester.fungrpc.HelloServiceGrpc;
import com.funtester.utils.Time;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {

    private static final Logger logger = LogManager.getLogger(HelloServiceImpl.class);

    @Override
    public void executeHi(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        HelloResponse response = HelloResponse.newBuilder()
                .setMsg("你好 " + request.getName()+ Time.getDate())
                .build();
        SourceCode.sleep(1.0);
        logger.info("用户{}来了",request.getName());
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

}

newBlockingStub

顾名思义,这个是阻塞调用的gRPC客户端类型,实际使用中跟HTTP接口请求->响应一样,代码如下:

package com.funtest.grpc

import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder

import java.util.concurrent.ExecutionException

class BlockClient extends SourceCode {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost"8080)
                .usePlaintext()
                .build()

        HelloServiceGrpc.HelloServiceBlockingStub helloServiceBlockingStub = HelloServiceGrpc.newBlockingStub(managedChannel).withCompression("gzip")

        HelloRequest helloRequest = HelloRequest.newBuilder()
                .setName("FunTester")
                .build()
        5.times {
            HelloResponse orderResponse = helloServiceBlockingStub.executeHi(helloRequest)
            output("收到响应: " + orderResponse.getMsg())
        }
        managedChannel.shutdown()
    }

}

控制台输出:

20:46:04.664 main 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
20:46:04.675 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

20:46:06.517 main 收到响应: 你好 FunTester2022-05-09 20:46:05
20:46:07.521 main 收到响应: 你好 FunTester2022-05-09 20:46:06
20:46:08.531 main 收到响应: 你好 FunTester2022-05-09 20:46:07
20:46:09.542 main 收到响应: 你好 FunTester2022-05-09 20:46:08
20:46:10.552 main 收到响应: 你好 FunTester2022-05-09 20:46:09

进程已结束,退出代码0

比较简单,这里不多做介绍了。

newStub

看名字有点猜不出来,这是个纯异步调用客户端。写上去代码可能比较多,但是如果把io.grpc.stub.StreamObserver对象拆开看就会比较容易懂一些。代码如下:

package com.funtest.grpc

import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder
import io.grpc.stub.StreamObserver

import java.util.concurrent.ExecutionException

class SyncClient extends SourceCode {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost"8080)
                .usePlaintext()
                .build()

        HelloServiceGrpc.HelloServiceStub helloServiceStub = HelloServiceGrpc.newStub(managedChannel).withCompression("gzip")

        HelloRequest helloRequest = HelloRequest.newBuilder()
                .setName("FunTester")
                .build()


        StreamObserver<HelloResponse> helloResponseStreamObserver = new StreamObserver<HelloResponse>() {

            @Override
            void onNext(HelloResponse value) {
                output(value.getMsg())
            }

            @Override
            void onError(Throwable t) {
                output(t.getMessage())
            }

            @Override
            void onCompleted() {

            }
        }
        5.times {helloServiceStub.executeHi(helloRequest, helloResponseStreamObserver)}
        sleep(2000)
        managedChannel.shutdown()
    }

}

由于都是异步,所以相当于自动多线程了。控制台输出如下:


20:50:59.053 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

20:51:00.816 grpc-default-executor-4 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-1 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-3 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-0 你好 FunTester2022-05-09 20:50:59
20:51:00.816 grpc-default-executor-2 你好 FunTester2022-05-09 20:50:59

进程已结束,退出代码0

可以看到,所有请求响应的结果时间都是一样的,说明请求到达服务端时间是一样的。

newFutureStub

这种客户端也是异步的,之所以放在最后将是因为它具有同步客户端的属性,在实际使用中,既可以当做异步客户端使用也可以当做一个同步的客户端使用。下面是演示代码:

package com.funtest.grpc

import com.funtester.frame.SourceCode
import com.funtester.fungrpc.HelloRequest
import com.funtester.fungrpc.HelloResponse
import com.funtester.fungrpc.HelloServiceGrpc
import com.google.common.util.concurrent.ListenableFuture
import io.grpc.ManagedChannel
import io.grpc.ManagedChannelBuilder

import java.util.concurrent.ExecutionException

class FutureClient extends SourceCode {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost"8080)
                .usePlaintext()
                .build()

        HelloServiceGrpc.HelloServiceFutureStub helloServiceFutureStub = HelloServiceGrpc.newFutureStub(managedChannel).withCompression("gzip")
        HelloRequest helloRequest = HelloRequest.newBuilder()
                .setName("FunTester")
                .build()
        //同步客户端的使用方式
        5.times {
            ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest)
            HelloResponse helloResponse = helloResponseListenableFuture.get()
            output(helloResponse.getMsg())
        }
        //异步客户端的使用方式
        def res = []
        5.times {
            ListenableFuture<HelloResponse> helloResponseListenableFuture = helloServiceFutureStub.executeHi(helloRequest)
            res << helloResponseListenableFuture
        }

        res.each {
            output(it.get().getMsg())
        }

        managedChannel.shutdown()
    }

}

控制台输出:

21:03:07.312 main 
  ###### #     #  #    # ####### ######  #####  ####### ######  #####
  #      #     #  ##   #    #    #       #         #    #       #    #
  ####   #     #  # #  #    #    ####    #####     #    ####    #####
  #      #     #  #  # #    #    #            #    #    #       #   #
  #       #####   #    #    #    ######  #####     #    ######  #    #

21:03:09.226 main 你好 FunTester2022-05-09 21:03:08
21:03:10.232 main 你好 FunTester2022-05-09 21:03:09
21:03:11.238 main 你好 FunTester2022-05-09 21:03:10
21:03:12.247 main 你好 FunTester2022-05-09 21:03:11
21:03:13.255 main 你好 FunTester2022-05-09 21:03:12
21:03:14.262 main 你好 FunTester2022-05-09 21:03:13
21:03:14.281 main 你好 FunTester2022-05-09 21:03:13
21:03:14.281 main 你好 FunTester2022-05-09 21:03:13
21:03:14.282 main 你好 FunTester2022-05-09 21:03:13
21:03:14.282 main 你好 FunTester2022-05-09 21:03:13

进程已结束,退出代码0

可以看到,前面五个请求是串行的,后面的五个请求是并行的。在实际工作中,使用到异步调用又要处理结果的地方也是这种类型使用较多,而使用Java的线程同步类,往往比较麻烦也不够优雅。

Have Fun ~ Tester !