vlambda博客
学习文章列表

gRPC拦截器的实现

服务端拦截器

grpc接口服务端拦截器,一般用来做一些预处理及后处理操作。如下,举两个常用的例子。

1.在微服务之间使用gRPC互相调用的时候,会传入一些公共的与业务不相关的元数据,这些数据就很适合在拦截器中实现。

如下服务端的拦截器将gRPC client传入的数据放入gRPC的context中,接口中就可以使用ctx.Value去获取该数据。

// MetaDataInterceptor get grpc server info, requestId/traceId/LogIdfunc MetaServerDataInterceptor() grpc.UnaryServerInterceptor { // 拦截器函数签名 // @params ctx Grpc context // @params req grpc request // @params info grpc request info // @params handler the grpc method return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// do what you want to do // get metadata from grpc client md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. if len(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // next return handler(ctx, req) }}

1.在实际的环境中,经常会需要在gRPC 接口之前之后做一些处理。比如,在开始之前记录时间,执行之后记录耗时操作;执行之后判断执行结果等等

如下所示,实现了一个记录接口耗时功能的拦截器,当然实际不会这么low。

// API time elas time get grpc server infofunc APITimeInterceptor() grpc.UnaryServerInterceptor { // 拦截器签名 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// do what you want to do start := time.Now().UnixNano() // do gRPC method ret := handler(ctx, req) // do what you want after the grpc method fmt.Println(time.Now().UnixNano() - start) return ret }}

服务端流式接口拦截器

在golang的gRPC中,普通接口与stream接口的拦截器,需要分别实现。以上的拦截器只用于非stream的接口,对于stream接口,以上拦截器是不生效的。流式拦截器函数签名如下:

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

查看流式拦截器可知,stream的context是在ServerStream中的,因此stream 要传递context 需要继承ServerStream并覆盖context。如下所示

// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Contexttype WrappedStream struct { grpc.ServerStream // serverStream interface Ctx *context.Context // 定义ctx,覆盖ServerStream中的context}
// Context override the context method and can config the context manuallyfunc (c WrappedStream) Context() context.Context { return *c.Ctx}
// NewWrappedStream wrapper the grpc.ServerStreamfunc NewWrappedStream(s grpc.ServerStream, ctx *context.Context) grpc.ServerStream { wrapper := &WrappedStream{ ServerStream: s, Ctx: ctx, } stream := grpc.ServerStream(wrapper) return stream}

实现该封装之后,就可以将上层的context获取并将元数据写入context后,调用NewWrappedStream传入gRPC的接口调用中。如下所示

1.流式拦截器实现元数据的传递

// stream method to get meta datafunc MetaStreamServerInterceptor() grpc.StreamServerInterceptor { // 函数签名 return func( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { // 获取当前 grpc context ctx := ss.Context() md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. if len(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // set context to next  return handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx)) }}

gRPC客户端拦截器

gRPC客户端拦截器是在调用gRPC接口之前与之后执行的操作。比如,元数据需要在请求接口之前塞入到metaData中(http2.0Header),才会传递到gRPC的服务端。如下,将当前接口context中的数据放入header中传入服务端。

// request grpc service with requestId/traceId info.func MetaClientDataInterceptor() grpc.UnaryClientInterceptor { // 函数签名 return func( ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, ) (err error) { // 获取当前header数据,没有则新建一个 md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs() } for _, key := range keyNames { value := ctx.Value(key) if strValue, ok := value.(string); ok && strValue != "" {
md.Set(key, strValue) } } // 将header写入 ctx = metadata.NewOutgoingContext(ctx, md) // 执行调用 return invoker(ctx, method, req, resp, cc, opts...) }}

流式客户端拦截器

Stream client的实现也是比较简单的,与服务端不同的是,客户端的流式拦截器不需要封装一层,可以直接使用。如下,同样实现了元数据传递到服务端的拦截器。

// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream serverfunc MetaStreamClientInterceptor() grpc.StreamClientInterceptor { // 函数签名 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 从context获取元数据 md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs() } for _, key := range keyNames { value := ctx.Value(key) if strValue, ok := value.(string); ok && strValue != "" { md.Set(key, strValue) } } // set metadata to ctx ctx = metadata.NewOutgoingContext(ctx, md)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return clientStream, err }}

总结

拦截器为gRPC的服务端/客户端复用公共模块提供了一种很简单方便的方法,只需要实现对应的拦截器函数,在服务端启动或者客户端连接的时候作为选项传入即可(自行搜索)。需要注意的是,在Golang中,拦截器分为普通接口与流式接口的拦截器,需要分别实现。

1.流式服务端拦截器

gRPC中拦截器流式接口拦截器需要实现如下签名的函数,有兴趣可深入了解下。例子如上所示

// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.// info contains all the information of this RPC the interceptor can operate on. And handler is the// service method implementation. It is the responsibility of the interceptor to invoke handler to// complete the RPC.func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

注意streamServer拦截器如果需要传递context,需要将ServerStream进行封装,覆盖Context 函数

1.普通服务端拦截器

普通方法的拦截器实现比较简单,实现如下签名函数

// @params ctx: grpc context// @params req: the request params// @params info: the grpc request info// @params handler: the real grpc methodfunc(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)

1.客户端普通拦截器

golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。

// @params method: the RPC name// @params req: the request// @params resp: the response// @params cc: the ClientConn on which the RPC was invoked// @params invoker: the invoker of grpc methor// @params opts: the optionfunc( ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, )

1.客户端流失拦截器

实现如下签名的函数即可

// @params desc: contains a description of the stream// @params cc: the ClientConn on which the RPC was invoked// @params method: the RPC name// @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it// @params opts: the optionfunc(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

以上即为Golang的拦截器实现,可以分为服务端与客户端的拦截器,两端分别有流式拦截器与普通接口拦截器,在使用的时候可根据自己的业务需求实现。

知你码农
在这喧嚣的城市,我们需要点滴时间,需要一隅之地,去聆听自己,聆听自己的心声,听从自己内心的安排,茫茫人海,这里很安静。
12篇原创内容
Official Account