vlambda博客
学习文章列表

gRPC背压流控、压缩及JSON通信【知识笔记】

目录

一、压缩
1.Server端所有方法压缩
2.Server单独方法压缩
3.Client请求内容压缩
二、使用JSON通信
1.方法描述使用JSON编译
2.JSON编译具体过程
三、手动流量控制
1.Consuming Side
2.Producing Side
四、系列文章

本文继续整理gRPC的使用,走查解读官方给出的压缩示例、使用JSON通信以及手动流量控制。

一、压缩

1.Server端所有方法压缩
server = ServerBuilder.forPort(port)
.intercept(new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// @1 在拦截器中设置压缩算法
call.setCompression("gzip");
return next.startCall(call, headers);
}
})
.addService(new GreeterImpl())
.build()
.start();

备注:如果需要在Server端所有方法进行压缩,可以在ServerInterceptor拦击器中通过setCompression进行设置。

2.Server单独方法压缩

如果不想对所有的方法传输内容压缩,gPRC提供了单独方法的压缩。

int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new GreeterImpl())
.build()
.start();
static classGreeterImplextendsGreeterGrpc.GreeterImplBase{

@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> plainResponseObserver) {
ServerCallStreamObserver<HelloReply> responseObserver =
(ServerCallStreamObserver<HelloReply>) plainResponseObserver;
// @1 对单个方法传输内容进行压缩
responseObserver.setCompression("gzip");
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}

备注:单个方法的压缩通过ServerCallStreamObserver的setCompression进行单独设置。

3.Client请求内容压缩

客户端对请求内容进行压缩,下面示例通过gzip进行压缩。

publicvoidgreet(String name) {
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
HelloReply response;
try {
// @1 对请求内容设置压缩类型
response = blockingStub.withCompression("gzip").sayHello(request);
} catch (StatusRuntimeException e) {
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
return;
}

二、使用JSON通信

gRPC可以通过Json格式进行通信,虽然并不建议这么做,Json的效率要远低于ProtoBuf。看下示例是如何通过Json格式通信的。

1.方法描述使用JSON编译

对方法的出参和入参使用JSON适配器,示例中通过MethodDescriptor.toBuilder重写出入参数的解析格式。

static final MethodDescriptor<HelloRequest, HelloReply> METHOD_SAY_HELLO =
GreeterGrpc.getSayHelloMethod()
.toBuilder(
// @1 请求参数使用JSON编译 JsonMarshaller.jsonMarshaller(HelloRequest.getDefaultInstance()),
// @2 返回参数使用JSON编译

JsonMarshaller.jsonMarshaller(HelloReply.getDefaultInstance()))
.build();
2.JSON编译具体过程

既然通过对方法的出入参数编译成JSON格式,看下gRPC是如何做的呢?

public static <T extends Message> Marshaller<T> jsonMarshaller(final T defaultInstance) {
final Parser parser = JsonFormat.parser();
final Printer printer = JsonFormat.printer();
return jsonMarshaller(defaultInstance, parser, printer);
}
public static <T extends Message> Marshaller<T> jsonMarshaller(
final T defaultInstance, final Parser parser, final Printer printer)
{
final Charset charset = Charset.forName("UTF-8");
return new Marshaller<T>() {
@Override
public InputStream stream(T value) {
try {
// @1 通过printer.print将出入参数转换为JSON格式
return new ByteArrayInputStream(printer.print(value).getBytes(charset));
} catch (InvalidProtocolBufferException e) {
// ...
}
}
// ....
}

备注:在JsonFormat.print方法中进行具体的请求/返回参数转换为JSON的具体实现。

请求转换JSON格式截图


返回转换JSON格式截图


3.Client使用JSON格式的方法描述
public HelloReply sayHello(HelloRequest request) {
// @1 使用JSON格式的方法描述METHOD_SAY_HELLO
return blockingUnaryCall(
getChannel(), METHOD_SAY_HELLO, getCallOptions(), request);
}
4.Server使用JSON格式的方法描述
public ServerServiceDefinition bindService() {
return ServerServiceDefinition
.builder(GreeterGrpc.getServiceDescriptor().getName())
// @1 使用JSON格式的方法描述METHOD_SAY_HELLO
.addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
asyncUnaryCall(
new UnaryMethod<HelloRequest, HelloReply>() {
@Override
publicvoidinvoke(
HelloRequest request, StreamObserver<HelloReply> responseObserver)
{
sayHello(request, responseObserver);
}
}))
.build();
}

三、手动流量控制

gRPC的流量控制基于HTTP/2的流量控制,即背压模式。关于gRPC和HTTP/2背压模式原理和关系,请看下面摘录。

At the bottom is the HTTP/2's byte-based flow control. HTTP/2 works on streams of bytes and is completely unaware of gRPC messages or reactive streams. By default, the stream consumer allocates a budget of 65536 bytes.

The stream producer can send up to this many bytes before backpressure engages. As the consumer reads bytes, WINDOW_UPDATE messages are sent to the producer to increase its send budget.


In the middle is the gRPC-Java message-based flow control. gRPC's flow control adapts the stream-based flow control of HTTP/2 to a message-based flow control model.

Importantly, gRPC's flow control is aware of how it interacts with HTTP/2 and the network.


On producing side, an on-ready handler reads a message, serializes it into bytes using protobuf, and then queues it up for transmission over the HTTP/2 byte stream.

If there is insuficient room in the HTTP/2 flow control window to transmit, backpressure engages an no more messages are requested from the producer until space becomes available.


On the consuming side, each time a consumer calls request(x), gRPC attempts to read and deserialize x messages from the HTTP/2 stream.

Since the size of a protobuf encoded message is variable, there is not a one-to-one correlation between pulling messages from gRPC and pulling bytes over HTTP/2.

1.Consuming Side
publicstaticvoidmain(String[] args) throws InterruptedException, IOException {
// @1 Server端服务实现
StreamingGreeterGrpc.StreamingGreeterImplBase svc = new StreamingGreeterGrpc.StreamingGreeterImplBase() {
@Override
public StreamObserver<HelloRequest> sayHelloStreaming(final StreamObserver<HelloReply> responseObserver) {
final ServerCallStreamObserver<HelloReply> serverCallStreamObserver =
(ServerCallStreamObserver<HelloReply>) responseObserver;
// @2 禁止自动流控模式,开启手动流控
serverCallStreamObserver.disableAutoInboundFlowControl();
// @3 背压模式流控,当消费端有足够空间时将会回调OnReadyHandler
// 默认空间大小为65536字节

classOnReadyHandlerimplementsRunnable{
private boolean wasReady = false;

@Override
publicvoidrun() {
if (serverCallStreamObserver.isReady() && !wasReady) {
wasReady = true;
logger.info("READY");
// @4 向HTTP/2流请求读取并解压(x)条消息
// 即发信号通知发送端发送继续发消息

serverCallStreamObserver.request(1);
}
}
}
final OnReadyHandler onReadyHandler = new OnReadyHandler();
serverCallStreamObserver.setOnReadyHandler(onReadyHandler);
// @5 处理具体进来的请求
return new StreamObserver<HelloRequest>() {
@Override
publicvoidonNext(HelloRequest request) {
try {
String name = request.getName();
logger.info("--> " + name);
Thread.sleep(100);
String message = "Hello " + name;
logger.info("<-- " + message);
HelloReply reply = HelloReply.newBuilder().setMessage(message).build();
// @6 向Client发送请求
responseObserver.onNext(reply);
if (serverCallStreamObserver.isReady()) {
// @7 向HTTP/2流请求读取并解压(x)条消息
serverCallStreamObserver.request(1);
} else {
onReadyHandler.wasReady = false;
}
} catch (Throwable throwable) {
//
}
}
@Override
publicvoidonError(Throwable t) {
t.printStackTrace();
responseObserver.onCompleted();
}
@Override
publicvoidonCompleted() {
logger.info("COMPLETED");
responseObserver.onCompleted();
}
};
}
};

final Server server = ServerBuilder
.forPort(50051)
.addService(svc)
.build()
.start();
2.Producing Side
publicstaticvoidmain(String[] args) throws InterruptedException {
final CountDownLatch done = new CountDownLatch(1);
ManagedChannel channel = ManagedChannelBuilder
.forAddress("localhost", 50051)
.usePlaintext()
.build();
StreamingGreeterGrpc.StreamingGreeterStub stub = StreamingGreeterGrpc.newStub(channel);

ClientResponseObserver<HelloRequest, HelloReply> clientResponseObserver =
new ClientResponseObserver<HelloRequest, HelloReply>() {
ClientCallStreamObserver<HelloRequest> requestStream;
@Override
publicvoidbeforeStart(final ClientCallStreamObserver<HelloRequest> requestStream) {
this.requestStream = requestStream;
// @1设置手动流量控制
requestStream.disableAutoInboundFlowControl();
// @2 当Consumer端有足够空间时自动回调
// 序列化protobuf先发送到缓存区(还未到Server端)
// Server端需要调用request()向Client拉取消息

requestStream.setOnReadyHandler(new Runnable() {
Iterator<String> iterator = names().iterator();
@Override
publicvoidrun() {
while (requestStream.isReady()) {
if (iterator.hasNext()) {
String name = iterator.next();
logger.info("--> " + name);
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
// @3 将消息发送到缓存区
requestStream.onNext(request);
} else {
// @4 标记Client发送完成
requestStream.onCompleted();
}
}
}
});
}

@Override
publicvoidonNext(HelloReply value) {
// @5 接受Server端返回信息
logger.info("<-- " + value.getMessage());
// @6 通知Client继续发送
requestStream.request(1);
}

@Override
publicvoidonError(Throwable t) {
t.printStackTrace();
done.countDown();
}

@Override
publicvoidonCompleted() {
logger.info("All Done");
done.countDown();
}
};
stream processing.
stub.sayHelloStreaming(clientResponseObserver);

done.await();

channel.shutdown();
channel.awaitTermination(1, TimeUnit.SECONDS);
}


四、系列文章







「瓜农老梁  学习同行」