gRPC拦截器的实现
服务端拦截器
grpc接口服务端拦截器,一般用来做一些预处理及后处理操作。如下,举两个常用的例子。
1.在微服务之间使用gRPC互相调用的时候,会传入一些公共的与业务不相关的元数据,这些数据就很适合在拦截器中实现。
如下服务端的拦截器将gRPC client传入的数据放入gRPC的context中,接口中就可以使用ctx.Value去获取该数据。
// MetaDataInterceptor get grpc server info, requestId/traceId/LogId
func MetaServerDataInterceptor() grpc.UnaryServerInterceptor {
// 拦截器函数签名
// @params ctx Grpc context
// @params req grpc request
// @params info grpc request info
// @params handler the grpc method
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
// do what you want to do
// get metadata from grpc client
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
// Set request info for context.
// define your key
for _, key := range []string{"requestId"} {
value := md.Get(key)
// ignore it if not exists.
if len(value) >= 1 {
// set value to context. you can use ctx.Value to get it from your grpc method
ctx = context.WithValue(ctx, key, value[0])
}
}
// next
return handler(ctx, req)
}
}
1.在实际的环境中,经常会需要在gRPC 接口之前之后做一些处理。比如,在开始之前记录时间,执行之后记录耗时操作;执行之后判断执行结果等等
如下所示,实现了一个记录接口耗时功能的拦截器,当然实际不会这么low。
// API time elas time get grpc server info
func APITimeInterceptor() grpc.UnaryServerInterceptor {
// 拦截器签名
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {
// do what you want to do
start := time.Now().UnixNano()
// do gRPC method
ret := handler(ctx, req)
// do what you want after the grpc method
fmt.Println(time.Now().UnixNano() - start)
return ret
}
}
服务端流式接口拦截器
在golang的gRPC中,普通接口与stream接口的拦截器,需要分别实现。以上的拦截器只用于非stream的接口,对于stream接口,以上拦截器是不生效的。流式拦截器函数签名如下:
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
查看流式拦截器可知,stream的context是在ServerStream中的,因此stream 要传递context 需要继承ServerStream并覆盖context。如下所示
// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Context
type WrappedStream struct {
grpc.ServerStream // serverStream interface
Ctx *context.Context // 定义ctx,覆盖ServerStream中的context
}
// Context override the context method and can config the context manually
func (c WrappedStream) Context() context.Context {
return *c.Ctx
}
// NewWrappedStream wrapper the grpc.ServerStream
func NewWrappedStream(s grpc.ServerStream, ctx *context.Context) grpc.ServerStream {
wrapper := &WrappedStream{
ServerStream: s,
Ctx: ctx,
}
stream := grpc.ServerStream(wrapper)
return stream
}
实现该封装之后,就可以将上层的context获取并将元数据写入context后,调用NewWrappedStream传入gRPC的接口调用中。如下所示
1.流式拦截器实现元数据的传递
// stream method to get meta data
func MetaStreamServerInterceptor() grpc.StreamServerInterceptor {
// 函数签名
return func(
srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 获取当前 grpc context
ctx := ss.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
// Set request info for context.
// define your key
for _, key := range []string{"requestId"} {
value := md.Get(key)
// ignore it if not exists.
if len(value) >= 1 {
// set value to context. you can use ctx.Value to get it from your grpc method
ctx = context.WithValue(ctx, key, value[0])
}
}
// set context to next
return handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx))
}
}
gRPC客户端拦截器
gRPC客户端拦截器是在调用gRPC接口之前与之后执行的操作。比如,元数据需要在请求接口之前塞入到metaData中(http2.0Header),才会传递到gRPC的服务端。如下,将当前接口context中的数据放入header中传入服务端。
// request grpc service with requestId/traceId info.
func MetaClientDataInterceptor() grpc.UnaryClientInterceptor {
// 函数签名
return func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
// 获取当前header数据,没有则新建一个
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
for _, key := range keyNames {
value := ctx.Value(key)
if strValue, ok := value.(string); ok && strValue != "" {
md.Set(key, strValue)
}
}
// 将header写入
ctx = metadata.NewOutgoingContext(ctx, md)
// 执行调用
return invoker(ctx, method, req, resp, cc, opts...)
}
}
流式客户端拦截器
Stream client的实现也是比较简单的,与服务端不同的是,客户端的流式拦截器不需要封装一层,可以直接使用。如下,同样实现了元数据传递到服务端的拦截器。
// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream server
func MetaStreamClientInterceptor() grpc.StreamClientInterceptor {
// 函数签名
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 从context获取元数据
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
for _, key := range keyNames {
value := ctx.Value(key)
if strValue, ok := value.(string); ok && strValue != "" {
md.Set(key, strValue)
}
}
// set metadata to ctx
ctx = metadata.NewOutgoingContext(ctx, md)
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return clientStream, err
}
}
总结
拦截器为gRPC的服务端/客户端复用公共模块提供了一种很简单方便的方法,只需要实现对应的拦截器函数,在服务端启动或者客户端连接的时候作为选项传入即可(自行搜索)。需要注意的是,在Golang中,拦截器分为普通接口与流式接口的拦截器,需要分别实现。
1.流式服务端拦截器
gRPC中拦截器流式接口拦截器需要实现如下签名的函数,有兴趣可深入了解下。例子如上所示
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.
// info contains all the information of this RPC the interceptor can operate on. And handler is the
// service method implementation. It is the responsibility of the interceptor to invoke handler to
// complete the RPC.
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error
注意streamServer拦截器如果需要传递context,需要将ServerStream进行封装,覆盖Context 函数
1.普通服务端拦截器
普通方法的拦截器实现比较简单,实现如下签名函数
// @params ctx: grpc context
// @params req: the request params
// @params info: the grpc request info
// @params handler: the real grpc method
func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error)
1.客户端普通拦截器
golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。
// @params method: the RPC name
// @params req: the request
// @params resp: the response
// @params cc: the ClientConn on which the RPC was invoked
// @params invoker: the invoker of grpc methor
// @params opts: the option
func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
)
1.客户端流失拦截器
实现如下签名的函数即可
// @params desc: contains a description of the stream
// @params cc: the ClientConn on which the RPC was invoked
// @params method: the RPC name
// @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it
// @params opts: the option
func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
以上即为Golang的拦截器实现,可以分为服务端与客户端的拦截器,两端分别有流式拦截器与普通接口拦截器,在使用的时候可根据自己的业务需求实现。