刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • rpc

    • RPC 初识
    • RPC简单实现
    • grpc初识和实现
    • protobuf进阶
    • grpc进阶
      • go grpc的metadata
        • 1. go中使用metadata
        • 2. grpc中使用metadata
      • python 中使用metadata
      • go grpc 拦截器
      • python grpc 拦截器
      • 1.grpc常见错误码
      • 2.demo
      • 3.go 和python 互相错误处理
      • 1. python的超时
      • 2. go的超时
  • grpc

  • protobuf

  • rpc+grpc
  • rpc
bigox
2022-06-16
目录

grpc进阶

# 一. python的grpc结合asyncio

# 方法1

  • 使用grpc 官方库: https://grpc.github.io/grpc/python/grpc_asyncio.html

# 方法2

  • 使用第三方库grpclib:https://github.com/vmagamedov/grpclib
  • pip install grpclib

# 二. grpc metadata

# go grpc的metadata

gRPC让我们可以像本地调用一样实现远程调用,对于每一次的RPC调用中,都可能会有一些有用的数据,而这些数据就可以通过metadata来传递。

metadata是以key-value的形式存储数据的,其中key是string类型,而value是[]string,即一个字符串数组类型。

metadata使得client和server能够为对方提供关于本次调用的一些信息,就像一次http请求的RequestHeader和ResponseHeader一样。http中header的生命周周期是一次http请求,那么metadata的生命周期就是一次RPC调用。

# 1. go中使用metadata

源码:源码 (opens new window)

项目文档:文档 (opens new window)

1.1 新建metadata

MD 类型实际上是map,key是string,value是string类型的slice。

type MD map[string][]string
1

创建的时候可以像创建普通的map类型一样使用new关键字进行创建:

//第一种方式
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
//第二种方式 key不区分大小写,会被统一转成小写。
md := metadata.Pairs(
    "key1", "val1",
    "key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
    "key2", "val2",
)
1
2
3
4
5
6
7
8

1.2. 发送metadata

md := metadata.Pairs("key", "val")

// 新建一个有 metadata 的 context
ctx := metadata.NewOutgoingContext(context.Background(), md)

// 单向 RPC
response, err := client.SomeRPC(ctx, someRequest)
1
2
3
4
5
6
7

1.3. 接收metadata

func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}
1
2
3
4

# 2. grpc中使用metadata

2.1. proto

syntax = "proto3";
option go_package=".;proto";

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

2.2. server

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc/metadata"
    "net"
    "google.golang.org/grpc"
    "OldPackageTest/grpc_test/proto"
)

type Server struct{}

func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
    error) {

    md, ok := metadata.FromIncomingContext(ctx)
    if ok {
        fmt.Println("get metadata error")
    }
    if nameSlice, ok := md["name"]; ok {
        fmt.Println(nameSlice)
        for i, e := range nameSlice {
            fmt.Println(i, e)
        }
    }
    return &proto.HelloReply{
        Message: "hello " + request.Name,
    }, nil
}

func main() {
    g := grpc.NewServer()
    proto.RegisterGreeterServer(g, &Server{})
    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil {
        panic("failed to listen:" + err.Error())
    }
    err = g.Serve(lis)
    if err != nil {
        panic("failed to start grpc:" + err.Error())
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

2.3 client

/*
 * @date: 2021/10/26
 * @desc: ...
 */

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/metadata"
	"picturePro/grpcTest/proto"
)

func main() {
	conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithInsecure())
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	c := proto.NewGreeterClient(conn)
	md := metadata.New(map[string]string{
		"name":   "body",
		"passwd": "1234abcd",
	})
	ctx := metadata.NewOutgoingContext(context.Background(), md)
	r, err := c.SayHello(ctx, &proto.HelloRequest{Name: "bobby"})
	if err != nil {
		panic(err)
	}
	fmt.Println(r.Message)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

# python 中使用metadata

1. proto

syntax = "proto3";
option go_package=".;proto";

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

2. server

from __future__ import print_function
from concurrent import futures
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc


class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        for key, value in context.invocation_metadata():
            print('Received initial metadata: key=%s value=%s' % (key, value))

        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    logging.basicConfig()
    serve()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

3. client

from __future__ import print_function
import logging

import grpc

import helloworld_pb2
import helloworld_pb2_grpc


def run():
    # NOTE(gRPC Python Team): .close() is possible on a channel and should be
    # used in circumstances in which the with statement does not fit the needs
    # of the code.
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response, call = stub.SayHello.with_call(
            helloworld_pb2.HelloRequest(name='you'),
            metadata=(
                ('name', 'bobby'),
                ('password','imooc')
            )

    print("Greeter client received: " + response.message)

if __name__ == '__main__':
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 三. 拦截器

  • 拦截器的定位是类似django中间件

# go grpc 拦截器

  • protobuf

    syntax = "proto3";
    option go_package = ".;proto";
    service Greeter {
        rpc SayHello (HelloRequest) returns (HelloReply);
    }
    //将 sessionid放入 放入cookie中 http协议
    message HelloRequest {
        string name = 1;
    }
    
    message HelloReply {
        string message = 1;
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  • server

    package main
    
    import (
        "context"
        "fmt"
        "net"
    
        "google.golang.org/grpc"
    
        "start/grpc_interceptor/proto"
    )
    
    
    type Server struct{}
    
    func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,
        error){
        return &proto.HelloReply{
            Message: "hello "+request.Name,
        }, nil
    }
    
    
    func main(){
        var interceptor grpc.UnaryServerInterceptor
        interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
            // 继续处理请求
            fmt.Println("接收到新请求")
            res, err := handler(ctx, req)
            fmt.Println("请求处理完成")
            return res, err
        }
        var opts []grpc.ServerOption
        opts = append(opts, grpc.UnaryInterceptor(interceptor))
    
        g := grpc.NewServer(opts...)
        proto.RegisterGreeterServer(g, &Server{})
        lis, err := net.Listen("tcp", "0.0.0.0:50051")
        if err != nil{
            panic("failed to listen:"+err.Error())
        }
        err = g.Serve(lis)
        if err != nil{
            panic("failed to start grpc:"+err.Error())
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
  • client

    package main
    
    import (
        "context"
        "fmt"
        "google.golang.org/grpc"
        "time"
    
        "start/grpc_interceptor/proto"
    )
    
    func interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        start := time.Now()
        err := invoker(ctx, method, req, reply, cc, opts...)
        fmt.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)
        return err
    }
    
    func main(){
        //stream
        var opts []grpc.DialOption
    
        opts = append(opts, grpc.WithInsecure())
        // 指定客户端interceptor
        opts = append(opts, grpc.WithUnaryInterceptor(interceptor))
    
        conn, err := grpc.Dial("localhost:50051", opts...)
        if err != nil {
            panic(err)
        }
        defer conn.Close()
    
        c := proto.NewGreeterClient(conn)
        r, err := c.SayHello(context.Background(), &proto.HelloRequest{Name:"bobby"})
        if err != nil {
            panic(err)
        }
        fmt.Println(r.Message)
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39

# python grpc 拦截器

  1. proto
syntax = "proto3";
option go_package = ".;proto";
service Greeter {
    rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
    string name = 1;
}

message HelloReply {
    string message = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
  1. 客户端
import grpc
from datetime import datetime

from grpc_interceptor.proto import helloworld_pb2
from grpc_interceptor.proto import helloworld_pb2_grpc


class DefaultValueClientInterceptor(grpc.UnaryUnaryClientInterceptor):

    def intercept_unary_unary(self, continuation, client_call_details, request):
        start = datetime.now()
        rsp = continuation(client_call_details, request)
        last = datetime.now()-start
        print(last)
        print(last.microseconds/1000)
        # print(f"用时: {datetime.now()-start}")
        return rsp


def run():
    default_value_interceptor = DefaultValueClientInterceptor()
    with grpc.insecure_channel('localhost:50051') as channel:
        intercept_channel = grpc.intercept_channel(channel,
                                                   default_value_interceptor)
        stub = helloworld_pb2_grpc.GreeterStub(intercept_channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
    print("Greeter client received: " + response.message)


if __name__ == '__main__':
    run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  1. 服务端
from concurrent import futures
import logging

import grpc

from grpc_interceptor.proto import helloworld_pb2
from grpc_interceptor.proto import helloworld_pb2_grpc



class LogInterceptor(grpc.ServerInterceptor):

    def intercept_service(self, continuation, handler_call_details):
        print("请求开始")
        rsp =  continuation(handler_call_details)
        print("请求结束")
        return rsp

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)


def serve():
    header_validator = LogInterceptor()
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         interceptors=(header_validator,))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

# 四. 使用拦截器完成认证

  • proto

    syntax="proto3";
    option go_package=".;proto";
    service Greeter{
      rpc SayHello(HelloRequest) returns (HelloReply);
    }
    
    message HelloRequest{
      string name = 1;
    }
    message HelloReply{
      string message = 1;
    }
    
    //protoc -I . goods.proto --go_out=plugins=grpc:.
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
  • interceptor

    package interceptor
    
    import (
       "context"
       "fmt"
       "google.golang.org/grpc"
       "google.golang.org/grpc/codes"
       "google.golang.org/grpc/metadata"
       "google.golang.org/grpc/status"
       "time"
    )
    
    func ClientInterceptor(ctx context.Context, method string, req, reply interface{},
       cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
       start := time.Now()
       fmt.Println(method)
       err := invoker(ctx, method, req, reply, cc, opts...)
       fmt.Printf("method=%s req=%v rep=%v duration=%s error=%v\n", method, req, reply, time.Since(start), err)
       return err
    
    }
    
    func ServerInterceptor(ctx context.Context, req interface{},
       info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
       md, ok := metadata.FromIncomingContext(ctx)
       if !ok {
          fmt.Println("get meta data error")
          return resp, status.Error(codes.Unauthenticated, "没有认证信息")
       }
       var (
          userId  string
          userPWD string
       )
    
       if content, ok := md["userid"]; ok {
          userId = content[0]
       }
       if content, ok := md["userpwd"]; ok {
          userPWD = content[0]
       }
       fmt.Println(userId, userPWD)
       if userId != "123456" {
          return resp, status.Error(codes.Unauthenticated, "user id 不正确")
       }
    
       // 继续处理请求
       fmt.Println("接收到新请求")
       res, err := handler(ctx, req)
       fmt.Println("请求处理完成")
       return res, err
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
  • server

    package main
    
    import (
       "context"
       "fmt"
       "google.golang.org/grpc"
       "net"
       interceptor2 "picturePro/grpcAuthTest/interceptor"
       "picturePro/grpcTest/proto"
    )
    
    type Server struct {
    }
    
    func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply, error) {
       return &proto.HelloReply{
          Message: fmt.Sprintf("Hello %s", request.Name),
       }, nil
    }
    
    func main() {
       var interceptor grpc.UnaryServerInterceptor
       interceptor = interceptor2.ServerInterceptor
       var opts []grpc.ServerOption
       opts = append(
          opts,
          grpc.UnaryInterceptor(interceptor),
       )
       g := grpc.NewServer(opts...)
       proto.RegisterGreeterServer(g, &Server{})
       lis, err := net.Listen("tcp", ":8080")
       if err != nil {
          panic(err)
       }
       _ = g.Serve(lis)
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
  • client (自己构造metadata)

    package main
    
    import (
       "context"
       "fmt"
       "google.golang.org/grpc"
       "google.golang.org/grpc/metadata"
       interceptor2 "picturePro/grpcAuthTest/interceptor"
       "picturePro/grpcTest/proto"
    )
    
    func main() {
       var interceptor grpc.UnaryClientInterceptor
       interceptor = interceptor2.ClientInterceptor
       var opts []grpc.DialOption
       opts = append(
          opts,
          grpc.WithInsecure(),
          grpc.WithUnaryInterceptor(interceptor),
       )
    
       conn, err := grpc.Dial("localhost:8080", opts...)
       if err != nil {
          panic(conn)
    
       }
       defer conn.Close()
       client := proto.NewGreeterClient(conn)
    
       md := metadata.New(map[string]string{
          "userid":  "123456",
          "userpwd": "000000",
       })
    
       ctx := metadata.NewOutgoingContext(context.Background(), md)
    
       reply, err := client.SayHello(ctx, &proto.HelloRequest{
          Name: "world!",
       })
       if err != nil {
          panic(err)  
    
       }
       fmt.Println(reply.Message)
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
  • Client2(使用grpc.WithPerRPCCredentials构造)

    package main
    
    import (
       "context"
       "fmt"
       "google.golang.org/grpc"
       interceptor2 "picturePro/grpcAuthTest/interceptor"
       "picturePro/grpcTest/proto"
    )
    
    type credentials struct {
    }
    
    func (c credentials) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
       return map[string]string{
          "userid":  "123456",
          "userpwd": "000000",
       }, nil
    }
    func (c credentials) RequireTransportSecurity() bool {
       return false
    }
    
    func main() {
       var interceptor grpc.UnaryClientInterceptor
       interceptor = interceptor2.ClientInterceptor
       var opts []grpc.DialOption
       opts = append(
          opts,
          grpc.WithInsecure(),
          grpc.WithUnaryInterceptor(interceptor),
          grpc.WithPerRPCCredentials(credentials{}),
       )
    
       conn, err := grpc.Dial("localhost:8080", opts...)
       if err != nil {
          panic(conn)
    
       }
       defer conn.Close()
       client := proto.NewGreeterClient(conn)
    
       //md := metadata.New(map[string]string{
       // "userid":  "123456",
       // "userpwd": "000000",
       //})
       //
       //ctx := metadata.NewOutgoingContext(context.Background(), md)
    
       reply, err := client.SayHello(context.Background(), &proto.HelloRequest{
          Name: "world!",
       })
       if err != nil {
          panic(err)
    
       }
       fmt.Println(reply.Message)
    
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59

# 五. grpc 验证器

  • 使用库 protoc-gen-validate, 此接口不够完全稳定:https://github.com/envoyproxy/protoc-gen-validate

  • 使用教程:

    1. 新建validate.proto文件内容从 https://github.com/envoyproxy/protoc-gen-validate/blob/master/validate/validate.proto 拷贝, 或者使用-I ${GOPATH}/src/github.com/envoyproxy/protoc-gen-validate参数指定validate.proto 路径

    2. 新建helloworl.proto文件

      syntax = "proto3";
      
      import "validate.proto";
      option go_package=".;proto";
      
      service Greeter {
          rpc SayHello (Person) returns (Person);
      }
      
      message Person {
          uint64 id    = 1 [(validate.rules).uint64.gt    = 999];
      
          string email = 2 [(validate.rules).string.email = true];
          string name  = 3 [(validate.rules).string = {
                            pattern:   "^[^[0-9]A-Za-z]+( [^[0-9]A-Za-z]+)*$",max_bytes: 256,}];
      
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
    3. 服务端

      package main
      
      import (
          "context"
          "google.golang.org/grpc/codes"
          "google.golang.org/grpc/status"
          "net"
      
          "google.golang.org/grpc"
      
          "start/pgv_test/proto"
      )
      
      
      type Server struct{}
      
      func (s *Server) SayHello(ctx context.Context, request *proto.Person) (*proto.Person,
          error){
          return &proto.Person{
              Id: 32,
          }, nil
      }
      
      type Validator interface {
          Validate() error
      }
      
      func main(){
          var interceptor grpc.UnaryServerInterceptor
          interceptor = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
              // 继续处理请求
              if r, ok := req.(Validator); ok {
                  if err := r.Validate(); err != nil {
                      return nil, status.Error(codes.InvalidArgument, err.Error())
                  }
              }
      
              return handler(ctx, req)
          }
          var opts []grpc.ServerOption
          opts = append(opts, grpc.UnaryInterceptor(interceptor))
      
          g := grpc.NewServer(opts...)
          proto.RegisterGreeterServer(g, &Server{})
          lis, err := net.Listen("tcp", "0.0.0.0:50051")
          if err != nil{
              panic("failed to listen:"+err.Error())
          }
          err = g.Serve(lis)
          if err != nil{
              panic("failed to start grpc:"+err.Error())
          }
      }
      
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53

# 六. grpc 错误处理

# 1.grpc常见错误码

  • https://github.com/grpc/grpc/blob/master/doc/statuscodes.md

0:Ok:返回成功

1:Canceled:操作已取消

**2:Unknown:未知错误。**如果从另一个地址空间接收到的状态值属 于在该地址空间中未知的错误空间,则可以返回此错误的示例。 没有返回足够的错误信息的API引发的错误也可能会转换为此错误

3:InvalidArgument:表示客户端指定了无效的参数。 请注意,这与FailedPrecondition不同。 它表示无论系统状态如何(例如格式错误的文件名)都有问题的参数

4:DeadlineExceeded:意味着操作在完成之前过期。 对于更改系统状态的操作,即使操作成功完成,也可能会返回此错误。 例如,服务器的成功响应可能会延迟足够的时间以使截止日期到期

5:NotFound:表示找不到某个请求的实体(例如文件或目录)

6:AlreadyExists:表示尝试创建实体失败,因为已经存在

**7:PermissionDenied:表示调用者没有执行指定操作的权限。**它不能用于因耗尽某些资源而引起的拒绝(使用ResourceExhausted代替这些错误)。如果调用者无法识别,则不能使用它(使用Unauthenticated代替这些错误)

8:ResourceExhausted:表示某些资源已耗尽,可能是每个用户的配额,或者整个文件系统空间不足

**9:FailedPrecondition:表示操作被拒绝,因为系统不处于操作执行所需的状态。**例如,要删除的目录可能不是空的,rmdir操作应用于非目录等。可能帮助服务实现者判断FailedPrecondition,Aborted和Unavailable之间的试金石测试:使用不可用如果客户端只能重试失败的呼叫。如果客户端应该在更高级别重试(例如,重新启动读取 - 修改 - 写入序列),则使用中止。如果客户端不应该重试直到系统状态被明确修复,则使用FailedPrecondition。例如,如果“rmdir”由于目录非空而失败,应该返回FailedPrecondition,因为客户端不应该重试,除非他们首先通过从目录中删除文件来修复该目录。如果客户端在资源上执行条件REST获取/更新/删除并且 服务器 (opens new window) 上的资源与条件不匹配,则使用FailedPrecondition。例如,在相同的资源上发生冲突的读取 - 修改 - 写入

10:Aborted:表示操作被中止,通常是由于并发问题(如序列器检查失败,事务异常终止等)造成的。请参阅上面的试金石测试以确定FailedPrecondition,Aborted和Unavailable之间的差异

11:OutOfRange:表示操作尝试超过有效范围。 例如,寻找或阅读文件末尾。 与InvalidArgument不同,此错误表示如果系统状态更改可能会解决的问题。 例如,如果要求读取的偏移量不在[0,2 ^ 32-1]范围内,则32位文件系统将生成InvalidArgument,但如果要求从偏移量读取当前值,则它将生成OutOfRange 文件大小。 FailedPrecondition和OutOfRange之间有相当多的重叠。 我们建议在应用时使用OutOfRange(更具体的错误),以便遍历空间的调用者可以轻松查找OutOfRange错误以检测何时完成

12:Unimplemented:表示此服务中未执行或未支持/启用操作

13:Internal: 意味着底层系统预期的一些不变量已被打破。 如果你看到其中的一个错误,那么事情就会非常糟糕

**14:Unavailable:表示服务当前不可用。**这很可能是一种暂时性情况,可能会通过退避重试来纠正。请参阅上面的试金石测试以确定FailedPrecondition,Aborted和Unavailable之间的差异

15:DataLoss:指示不可恢复的数据丢失或损坏

16:Unauthenticated:表示请求没有有效的操作认证凭证

17:_maxCode:这个是最大的状态码

常见http状态码

  • 200 成功状态吗
  • 301 临时重定向
  • 302 永久重定向
  • 400 客户端 语法错误
  • 401 客户端身份认证失败
  • 403 拒绝访问
  • 404 找不到资源
  • 500 服务器内部错误
  • 501 服务器不支持
  • 502 错误的网关
  • 503 服务器过载
  • 504 网关超时
  • 505 http协议错误

# 2.demo

  • python

    • 服务端

      context.set_code(grpc.StatusCode.NOT_FOUND)
      context.set_details('记录不存在')
      
      1
      2
    • 客户端

            try:
                stub.SayHello(Request())
                print(rsp.status)
            except grpc.RpcError as e:
                d = e.details()
                print(d)
                status_code = e.code()
                print(status_code.name)
                print(status_code.value)
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  • go

    • 服务端

      st := status.New(codes.InvalidArgument, "invalid username")
      
      1
    • 客户端

    st, ok := status.FromError(err)
    if !ok {
        // Error was not a status error
    }
    st.Message()
    st.Code()
    
    1
    2
    3
    4
    5
    6

# 3.go 和python 互相错误处理

grpc 已经处理了个语言之间处理异常的差异, 互相调用市只需要按照个语言习惯处理就行,.

# 七.调用超时

# 1. python的超时

response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'), timeout=30)
1

# 2. go的超时

ctx, cancel := context.WithTimeout(context.TODO(), time.Second*3)
defer cancel()
 r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
1
2
3
#rpc
上次更新: 2023/05/04, 15:30:48
protobuf进阶
【grpc】1.初识

← protobuf进阶 【grpc】1.初识→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式