vlambda博客
学习文章列表

gRPC中Header传值与错误拦截处理【知识笔记】

目录

一、Header传值
1.客户端实现拦截器
2.客户端注入拦截器
3.服务端实现拦截器
4.服务端注入拦截器
5.输出信息
二、错误信息处理
1.服务端设置错误信息
2.BlockingStub获取错误信息
3.FutureStub-Direct获取错误信息
4.FutureStub-Callback获取错误信息
5.asyncCall获取错误信息
6.advancedAsyncCall获取错误信息
7.异常信息抽取
三、示例代码
四、系列文章

上篇中分析了gPRC支持的四种类型示例,本文继续示例解读,Header传值、错误处理。

一、Header传值

在RPC的服务调用中,往往需要在链路中通过透传一些值。gRPC同样提供了通过Header透传元数据新信息。

1.客户端实现拦截器

客户端拦截器需要实现ClientInterceptor接口,看以下示例.

public classHeaderClientInterceptorimplementsClientInterceptor{
private static final Logger logger = Logger.getLogger(HeaderClientInterceptor.class.getName());

@VisibleForTesting
static final Metadata.Key<String> CUSTOM_HEADER_KEY =
Metadata.Key.of("custom_client_header_key", Metadata.ASCII_STRING_MARSHALLER);

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next)
{
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {

@Override
publicvoidstart(Listener<RespT> responseListener, Metadata headers) {
// @1 在Header中设置需要透传的值
headers.put(CUSTOM_HEADER_KEY, "customRequestValue");
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
publicvoidonHeaders(Metadata headers) {
// @2 获取从服务端返回的Header信息
logger.info("header received from server:" + headers);
super.onHeaders(headers);
}
}, headers);
}
};
}
}
2.客户端注入拦截器

将拦截器注入到gRPC服务中,见示例。

privateCustomHeaderClient(String host, int port) {
originChannel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();
ClientInterceptor interceptor = new HeaderClientInterceptor();
// @1 构建Channel时注入客户端拦截器
Channel channel = ClientInterceptors.intercept(originChannel, interceptor);
blockingStub = GreeterGrpc.newBlockingStub(channel);
}
3.服务端实现拦截器

服务端拦截器需实现ServerInterceptor接口,见示例。

public classHeaderServerInterceptorimplementsServerInterceptor{

private static final Logger logger = Logger.getLogger(HeaderServerInterceptor.class.getName());

@VisibleForTesting
static final Metadata.Key<String> CUSTOM_HEADER_KEY =
Metadata.Key.of("custom_server_header_key", Metadata.ASCII_STRING_MARSHALLER);


@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next)
{
// @1 打印从客户端设置的header信息
logger.info("header received from client:" + requestHeaders);
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
publicvoidsendHeaders(Metadata responseHeaders) {
// @2 响应客户端设置服务端Header信息
responseHeaders.put(CUSTOM_HEADER_KEY, "customRespondValue");
super.sendHeaders(responseHeaders);
}
}, requestHeaders);
}
}
4.服务端注入拦截器
privatevoidstart() throws IOException {
server = ServerBuilder.forPort(PORT)
// @1 构建Server时注入自定义拦截器
.addService(ServerInterceptors.intercept(new GreeterImpl(), new HeaderServerInterceptor()))
.build()
.start();
logger.info("Server started, listening on " + PORT);

}
5.输出信息
// @1 Server输出
header received from client:Metadata(content-type=application/grpc,user-agent=grpc-java-netty/1.25.0,custom_client_header_key=customRequestValue,grpc-accept-encoding=gzip)
// @2 client输出
header received from server:Metadata(content-type=application/grpc,custom_server_header_key=customRespondValue,grpc-encoding=identity,grpc-accept-encoding=gzip)

小结:gRPC上下游Header传值通过客户端实现ClientInterceptor和服务端实现ServerInterceptor来实现,并在client和server构造时注入拦截器。

二、错误信息处理

当Server抛出错误时,需要将错误信息返回给Client调用方,同时可以自定义错误信息;gRPC提供了相关方法。

1.服务端设置错误信息
private static final Metadata.Key<DebugInfo> DEBUG_INFO_TRAILER_KEY =
ProtoUtils.keyForProto(DebugInfo.getDefaultInstance());

private static final DebugInfo DEBUG_INFO =
DebugInfo.newBuilder()
.addStackEntries("stack_entry_1")
.addStackEntries("stack_entry_2")
.addStackEntries("stack_entry_3")
.setDetail("detailed error info.").build();
private static final String DEBUG_DESC = "detailed error description";
Server server = ServerBuilder.forPort(0).addService(new GreeterGrpc.GreeterImplBase() {
@Override
publicvoidsayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
// @1 自定义错误信息通过Metadata设置
Metadata trailers = new Metadata();
trailers.put(DEBUG_INFO_TRAILER_KEY, DEBUG_INFO);
// @2 Server通过onError将错误信息传输给Client
responseObserver.onError(Status.INTERNAL.withDescription(DEBUG_DESC)
.asRuntimeException(trailers));
// @3 通过asRuntimeException将自定义的异常信息传给Client
}
}).build().start();
2.BlockingStub获取错误信息
voidblockingCall() {
GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
try {
stub.sayHello(HelloRequest.newBuilder().build());
} catch (Exception e) {
// @1 接受Server端返回的异常信息
verifyErrorReply(e);
}
}
3.FutureStub-Direct获取错误信息
voidfutureCallDirect() {
GreeterFutureStub stub = GreeterGrpc.newFutureStub(channel);
ListenableFuture<HelloReply> response =
stub.sayHello(HelloRequest.newBuilder().build());

try {
response.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
// @1 此处解析Server端传输的异常信息
verifyErrorReply(e.getCause());
}
}
4.FutureStub-Callback获取错误信息
voidfutureCallCallback() {
GreeterFutureStub stub = GreeterGrpc.newFutureStub(channel);
ListenableFuture<HelloReply> response =
stub.sayHello(HelloRequest.newBuilder().build());
final CountDownLatch latch = new CountDownLatch(1);
Futures.addCallback(
response,
new FutureCallback<HelloReply>() {
@Override
publicvoidonSuccess(@Nullable HelloReply result) {
}

@Override
publicvoidonFailure(Throwable t) {
// @1 此处解析Server端传输的异常信息
verifyErrorReply(t);
latch.countDown();
}
},
directExecutor());
if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
throw new RuntimeException("timeout!");
}
}
5.asyncCall获取错误信息
voidasyncCall() {
GreeterStub stub = GreeterGrpc.newStub(channel);
HelloRequest request = HelloRequest.newBuilder().build();
final CountDownLatch latch = new CountDownLatch(1);
StreamObserver<HelloReply> responseObserver = new StreamObserver<HelloReply>() {

@Override
publicvoidonNext(HelloReply value) {
}

@Override
publicvoidonError(Throwable t) {
// @1 处理Server返回的异常信息
verifyErrorReply(t);
latch.countDown();
}

@Override
publicvoidonCompleted() {
}
};
stub.sayHello(request, responseObserver);

if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
throw new RuntimeException("timeout!");
}
}
6.advancedAsyncCall获取错误信息
void advancedAsyncCall() {
ClientCall<HelloRequest, HelloReply> call =
channel.newCall(GreeterGrpc.getSayHelloMethod(), CallOptions.DEFAULT);

final CountDownLatch latch = new CountDownLatch(1);

call.start(new ClientCall.Listener<HelloReply>() {

@Override
public void onClose(Status status, Metadata trailers) {
// @1 处理Server端返回的异常信息
Verify.verify(status.getCode() == Status.Code.INTERNAL);
Verify.verify(trailers.containsKey(DEBUG_INFO_TRAILER_KEY));
try {
Verify.verify(trailers.get(DEBUG_INFO_TRAILER_KEY).equals(DEBUG_INFO));
} catch (IllegalArgumentException e) {
throw new VerifyException(e);
}
latch.countDown();
}
}, new Metadata());

call.sendMessage(HelloRequest.newBuilder().build());
call.halfClose();

if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
throw new RuntimeException("timeout!");
}
}
7.异常信息抽取

io.grpc.Status提供了异常读取工具方法。

staticvoidverifyErrorReply(Throwable t) {
// @1 解析Server异常信息
Status status = Status.fromThrowable(t);
// @2 获取Server自定义异常元数据信息
Metadata trailers = Status.trailersFromThrowable(t);
Verify.verify(status.getCode() == Status.Code.INTERNAL);
Verify.verify(trailers.containsKey(DEBUG_INFO_TRAILER_KEY));
Verify.verify(status.getDescription().equals(DEBUG_DESC));
try {
Verify.verify(trailers.get(DEBUG_INFO_TRAILER_KEY)
.equals(DEBUG_INFO));
} catch (IllegalArgumentException e) {
throw new VerifyException(e);
}
}

小结:Client对Server返回异常信息的处理,除了返回Server异常状态码和描述信息外,还可以通过Metadata自定义更多的异常信息。Client提供了5种调用方式解析Server异常错误信息,同时也提供了很好的使用示例范本。

三、示例代码

https://github.com/grpc/grpc-java/tree/master/examples/src/main/java/io/grpc/examples

四、系列文章







「瓜农老梁  学习同行」