gRPC拦截器的实现
服务端拦截器
grpc接口服务端拦截器,一般用来做一些预处理及后处理操作。如下,举两个常用的例子。
1.在微服务之间使用gRPC互相调用的时候,会传入一些公共的与业务不相关的元数据,这些数据就很适合在拦截器中实现。
如下服务端的拦截器将gRPC client传入的数据放入gRPC的context中,接口中就可以使用ctx.Value去获取该数据。
// MetaDataInterceptor get grpc server info, requestId/traceId/LogIdfunc MetaServerDataInterceptor() grpc.UnaryServerInterceptor {// 拦截器函数签名// @params ctx Grpc context// @params req grpc request// @params info grpc request info// @params handler the grpc methodreturn func(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler) (resp interface{}, err error) {// do what you want to do// get metadata from grpc clientmd, ok := metadata.FromIncomingContext(ctx)if !ok {md = metadata.Pairs()}// Set request info for context.// define your keyfor _, key := range []string{"requestId"} {value := md.Get(key)// ignore it if not exists.if len(value) >= 1 {// set value to context. you can use ctx.Value to get it from your grpc methodctx = context.WithValue(ctx, key, value[0])}}// nextreturn handler(ctx, req)}}
1.在实际的环境中,经常会需要在gRPC 接口之前之后做一些处理。比如,在开始之前记录时间,执行之后记录耗时操作;执行之后判断执行结果等等
如下所示,实现了一个记录接口耗时功能的拦截器,当然实际不会这么low。
// API time elas time get grpc server infofunc APITimeInterceptor() grpc.UnaryServerInterceptor {// 拦截器签名return func(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler) (resp interface{}, err error) {// do what you want to dostart := time.Now().UnixNano()// do gRPC methodret := handler(ctx, req)// do what you want after the grpc methodfmt.Println(time.Now().UnixNano() - start)return ret}}
服务端流式接口拦截器
在golang的gRPC中,普通接口与stream接口的拦截器,需要分别实现。以上的拦截器只用于非stream的接口,对于stream接口,以上拦截器是不生效的。流式拦截器函数签名如下:
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
查看流式拦截器可知,stream的context是在ServerStream中的,因此stream 要传递context 需要继承ServerStream并覆盖context。如下所示
// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Contexttype WrappedStream struct {grpc.ServerStream // serverStream interfaceCtx *context.Context // 定义ctx,覆盖ServerStream中的context}// Context override the context method and can config the context manuallyfunc (c WrappedStream) Context() context.Context {return *c.Ctx}// NewWrappedStream wrapper the grpc.ServerStreamfunc NewWrappedStream(s grpc.ServerStream, ctx *context.Context) grpc.ServerStream {wrapper := &WrappedStream{ServerStream: s,Ctx: ctx,}stream := grpc.ServerStream(wrapper)return stream}
实现该封装之后,就可以将上层的context获取并将元数据写入context后,调用NewWrappedStream传入gRPC的接口调用中。如下所示
1.流式拦截器实现元数据的传递
// stream method to get meta datafunc MetaStreamServerInterceptor() grpc.StreamServerInterceptor {// 函数签名return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 获取当前 grpc contextctx := ss.Context()md, ok := metadata.FromIncomingContext(ctx)if !ok {md = metadata.Pairs()}// Set request info for context.// define your keyfor _, key := range []string{"requestId"} {value := md.Get(key)// ignore it if not exists.if len(value) >= 1 {// set value to context. you can use ctx.Value to get it from your grpc methodctx = context.WithValue(ctx, key, value[0])}}// set context to nextreturn handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx))}}
gRPC客户端拦截器
gRPC客户端拦截器是在调用gRPC接口之前与之后执行的操作。比如,元数据需要在请求接口之前塞入到metaData中(http2.0Header),才会传递到gRPC的服务端。如下,将当前接口context中的数据放入header中传入服务端。
// request grpc service with requestId/traceId info.func MetaClientDataInterceptor() grpc.UnaryClientInterceptor {// 函数签名return func(ctx context.Context,method string, req, resp interface{},cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,) (err error) {// 获取当前header数据,没有则新建一个md, ok := metadata.FromOutgoingContext(ctx)if !ok {md = metadata.Pairs()}for _, key := range keyNames {value := ctx.Value(key)if strValue, ok := value.(string); ok && strValue != "" {md.Set(key, strValue)}}// 将header写入ctx = metadata.NewOutgoingContext(ctx, md)// 执行调用return invoker(ctx, method, req, resp, cc, opts...)}}
流式客户端拦截器
Stream client的实现也是比较简单的,与服务端不同的是,客户端的流式拦截器不需要封装一层,可以直接使用。如下,同样实现了元数据传递到服务端的拦截器。
// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream serverfunc MetaStreamClientInterceptor() grpc.StreamClientInterceptor {// 函数签名return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,opts ...grpc.CallOption) (grpc.ClientStream, error) {// 从context获取元数据md, ok := metadata.FromOutgoingContext(ctx)if !ok {md = metadata.Pairs()}for _, key := range keyNames {value := ctx.Value(key)if strValue, ok := value.(string); ok && strValue != "" {md.Set(key, strValue)}}// set metadata to ctxctx = metadata.NewOutgoingContext(ctx, md)clientStream, err := streamer(ctx, desc, cc, method, opts...)return clientStream, err}}
总结
拦截器为gRPC的服务端/客户端复用公共模块提供了一种很简单方便的方法,只需要实现对应的拦截器函数,在服务端启动或者客户端连接的时候作为选项传入即可(自行搜索)。需要注意的是,在Golang中,拦截器分为普通接口与流式接口的拦截器,需要分别实现。
1.流式服务端拦截器
gRPC中拦截器流式接口拦截器需要实现如下签名的函数,有兴趣可深入了解下。例子如上所示
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.// info contains all the information of this RPC the interceptor can operate on. And handler is the// service method implementation. It is the responsibility of the interceptor to invoke handler to// complete the RPC.func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
注意streamServer拦截器如果需要传递context,需要将ServerStream进行封装,覆盖Context 函数
1.普通服务端拦截器
普通方法的拦截器实现比较简单,实现如下签名函数
// @params ctx: grpc context// @params req: the request params// @params info: the grpc request info// @params handler: the real grpc methodfunc(ctx context.Context,req interface{},info *grpc.UnaryServerInfo,handler grpc.UnaryHandler) (resp interface{}, err error)
1.客户端普通拦截器
golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。
// @params method: the RPC name// @params req: the request// @params resp: the response// @params cc: the ClientConn on which the RPC was invoked// @params invoker: the invoker of grpc methor// @params opts: the optionfunc(ctx context.Context,method string, req, resp interface{},cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,)
1.客户端流失拦截器
实现如下签名的函数即可
// @params desc: contains a description of the stream// @params cc: the ClientConn on which the RPC was invoked// @params method: the RPC name// @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it// @params opts: the optionfunc(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
以上即为Golang的拦截器实现,可以分为服务端与客户端的拦截器,两端分别有流式拦截器与普通接口拦截器,在使用的时候可根据自己的业务需求实现。
