【grpc】 6.中间件&拦截器
拦截器又称为全局钩子、中间件
构建gRPC应用程序时,客户端、服务端在远程方法执行之前,都可以执行一些通用的逻辑。诸如日志打印、参数校验、身份验证、性能度量指标等。gRPC包含多种通信模式,主要分为请求应答模式、流响应模式。因此根据需要拦截RPC的调用类型
gRPC拦截器可以分为两类:Unary Interceptor(一元拦截器)、Streaming Interceptor (流拦截器)。
- Unary Interceptor(一元拦截器)
- Streaming Interceptor (流拦截器)
安装客户端和服务端拦截器分类可分为:
- 服务端 一元拦截器 UnaryServerInterceptor
- 客户端 一元拦截器 UnaryClientInterceptor
- 服务端 流拦截器
- 客户端 流拦截器
gRPC支持多个拦截器组成拦截器链,拦截器链之间按照顺序执行,以责任链模式的方式共同完成代码逻辑。
# Unary Interceptor
一元拦截器
# 1. 服务端拦截器 UnaryServerInterceptor
定义
// 一元拦截器 type UnaryServerInterceptor func( ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler ) (resp interface{}, err error) // 类型定义 // info type UnaryServerInfo struct { // Server is the service implementation the user provides. This is read-only. Server interface{} // FullMethod is the full RPC method string, i.e., /package.service/method. FullMethod string } // handler type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20参数名称 参数说明 ctx context执行上下文 req 客户端的request请求,包含请求参数 info 服务名称, 通过该参数可以得知调用方法 handler 客户端调用的gRPC方法(洋葱模型) resp response 返回信息 代码示例
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { log.Printf("grpc server interceptor prehandler") m, err := handler(ctx, req) if err != nil { log.Printf("RPC failed with error %v", err) } log.Printf("grpc server interceptor posthandler") return m, err } //在调用NewServer时 传入拦截器 s := grpc.NewServer( grpc.UnaryInterceptor(unaryInterceptor), )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 2. 客户端拦截器 UnaryClientInterceptor
定义
// 客户端一元拦截器 type UnaryClientInterceptor func( ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption ) error
1
2
3
4
5
6
7
8
9
10参数名称 参数说明 ctx context执行上下文 method gRPC方法名称 req request body reply 服务端返回response cc client connectio invoker 调用的gRPC方法。如果执行此操作,则执行RPC opts 可选参数 代码实现
func unaryInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { log.Printf("grpc client interceptor prehandler") start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) end := time.Now() log.Printf("grpc client interceptor posthandler RPC: %s, start time: %s, end time: %s, err: %v", method, start.Format("Basic"), end.Format(time.RFC3339), err) return err } //客户端在建立连接时 传入拦截器 conn, err := grpc.Dial(*addr,grpc.WithUnaryInterceptor(unaryInterceptor))
1
2
3
4
5
6
7
8
9
10
11
12
# Streaming Interceptor
流式拦截器
# 1. 服务端拦截器 StreamServerInterceptor
定义
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
1代码实现
// 实现ServerStream接口 type interceptorStream struct { grpc.ServerStream } // 实现ServerStream接口 RecvMsg 方法 func (w *interceptorStream) RecvMsg(m interface{}) error { log.Printf("Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339)) return w.ServerStream.RecvMsg(m) } // 实现ServerStream接口 SendMsg 方法 func (w *interceptorStream) SendMsg(m interface{}) error { log.Printf("Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339)) return w.ServerStream.SendMsg(m) } // 自定义流式拦截器 func logStreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { log.Println("[pre stream] my stream server interceptor 1: ", info.FullMethod) err := handler(srv, &interceptorStream{ss}) log.Println("[post stream] my stream server interceptor 1: ") return err } grpcServer := grpc.NewServer(grpc.ChainStreamInterceptor(logStreamServerInterceptor))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 2. 客户端拦截器 StreamClientInterceptor
定义
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
1代码实现
func logClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { log.Printf("before invoker. method: %+v, StreamDesc:%+v", method, desc) clientStream, err := streamer(ctx, desc, cc, method, opts...) log.Printf("before invoker. method: %+v", method) return clientStream, err } conn,err := grpc.Dail(*addr, grpc.WithChainStreamInterceptor(logClientInterceptor))
1
2
3
4
5
6
7
8
9
# 三方库
https://github.com/grpc-ecosystem/go-grpc-middleware
上次更新: 2023/05/04, 15:30:48