刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • rpc

  • grpc

    • 【grpc】1.初识
    • 【grpc】2.上下文 metadata
    • 【grpc】3.健康检查
    • 【grpc】 4.keepalive
    • 【grpc】 5.命名解析
    • 【grpc】 6.中间件&拦截器
    • 【grpc】 7.负载均衡
      • 常用负载均衡方式
      • 负载均衡算法
      • gRPC负载均衡
      • gRPC 使用 console
      • gRPC 使用 etcd
        • server端
        • client端
      • 四层代理和七层代理
      • 最佳实践
    • 【grpc】 8.身份认证
    • 【grpc】 9.超时重试
    • 【grpc】 10.链路追踪
    • 【grpc】11.grpc-gw将gRPC 转 RESTful api
    • 【grpc】12.grpc-gw自定义选项
  • protobuf

  • rpc+grpc
  • grpc
bigox
2023-04-27
目录

【grpc】 7.负载均衡

# 常用负载均衡方式

  1. 代理模式
  2. 客户端负载均衡
  3. 额外负载均衡服务
  1. 代理模式
    • 在客户端和服务器之间提供一层服务转发代理,同等协议的转发,比如 HTTP 请求转发;
    • 代理需要拥有 rpc 的请求和响应临时副本,会消耗更多的资源
    • 代理模式增加 rpc 的延迟,在代理大量的服务(比如存储),会造成任务效率低下
  2. 客户端负载均衡
    • 把负载均衡的逻辑放在客户端中。
    • 客户端自己实现负载均衡策略(比如:轮询,随机分发等)来选择一个后端服务。在这种情况下,客户端通过 name resolution 系统 中拉取服务器列表。
    • 这种方法的缺点之一是要书写多语言,多版本的负载均衡器和维护。这些策略比较复杂。一些算法需要服务器和客户端通信来除了满足用户需要请求的 RPC 调用之外, 还需要额外的支持 RPC 以获得后端服务的运行状态和加载信息等。总而言之复杂度提升。
    • 胖客户端的方式通常是不推荐的,因为这样会导致客户端变得复杂。尤其在跨团队协作中,客户端代码的统一维护会成为挑战。
  3. 额外负载均衡服务
    • 客户端向负载均衡服务发出请求,负载均衡服务负责维护服务器列表的维护,以及实现各种复杂的负载均衡策略,而且通过健康检测和服务器的负载来合理的处理服务器可用性。
    • 基于底层的网络协议转发,以节省资源浪费。
    • 例子是 nginx,kubernetes 中的 services,或者 service mash 中的 sidecar。

# 负载均衡算法

  1. 轮询法

    • 轮询法,很好理解,将请求按照顺序轮流的分配到服务器上,他均衡的对待每一台后端的服务器, 不关心服务器的的连接数和负载情况.以下代码演示了这种算法.
  2. 随机法

    • 通过系统的随机函数,根据后端服务器列表的大小来随机获取其中的一台来访问,随着调用量的增大, 实际效果越来越近似于平均分配到没一台服务器.和轮询的效果类似.
  3. 源地址hash法

    • 源地址hash法的思想是获取客户端访问的ip地址,通过hash函数计算出一个hash值,用该hash值对服 务器列表的大小进行取模运算,得到的值就是要访问的服务器的序号.
  4. 加权轮询法

    • 刚刚有说道过,不同的服务器性能不同,所以不能一概而论,需要给性能低的服务器给比较低的 权重,性能高的给跟高的权重.
  5. 加权随机法

    • 加权随机法算法和加权轮询法类似
  6. 最小连接法

    • 前面我们费尽心思来实现服务消费者请求次数分配的均衡, 我们知道这样做是没错的,可以 为后端的多台服务器平均分配工作量,最大程度地提高服务器的利用率,但是,实际上,请求次 数的均衡并不代表负载的均衡。因此我们需要介绍最小连接数法,最小连接数法比较灵活和智 能,由于后台服务器的配置不尽相同,对请求的处理有快有慢,它正是根据后端服务器当前的连 接情况,动态的选取其中当前积压连接数最少的一台服务器来处理当前请求,尽可能的提高后台 服务器利用率,将负载合理的分流到每一台服务器。

# gRPC负载均衡

官方接口:https://pkg.go.dev/google.golang.org/grpc/balancer

客户端配置地址:https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto

image-20211109174543917

  1. 开始 gRPC client 会发起一个服务器名称解析请求。服务器名称会被解析为若干个 IP地址,每个ip会表明自己是一个服务地址或是负载均衡器地址,同时表明,服务配置 (opens new window)中希望客户端使用哪种负载均衡策略(round_bin, grpclb)

  2. 客户端实例化负载均衡策略。

    • 注意: 任何被解析服务返回的地址如果是一个负载均衡地址,client 将使用 grpclb 策略,无论服务配置中要求使用哪种负载均衡策略。其它的将使用服务配置中要求的策略。如果没有策略,客户端选择第一个可用的服务。
  3. 负载均衡策略为每个服务创建一个子通道【subchannel】。

    • 除了 grpclb,策略解析器会为每个地址创建一个子通道。

    • grpclb 的工作流如下:

      • a. grpclb会在 resolver返回的负载均衡器地址上打开一个流连接。客户端会从这个流中,根据名称获取到需要的服务器地址。

      注意: 在grpclb 策略下,非负载均衡器地址会以回调的方式使用,以防LB策略启动的时候, 没有均衡器可连接。

      • b. 如果负载均衡器配置需要知道服务的负载情况, 则服务器会上报该负载
      • c. 负载均衡器会返回一个服务器列表给gRPC客户端的 grpcLB策略。grpclb会为每个服务创建一个 子通道
  4. 对于每一个 RPC 发送,负载均衡策略决定将它发送到哪个子通道去

grpclb策略的场景下,客户端到服务器的请求是以他们被负载均衡器返回的顺序发送的。如果服务器列表为空,请求将阻塞到有一个非空的服务返回为止。

# gRPC 使用 console

  • 使用grpc-consul-resolver实现consul服务的负载均衡
  • grpc-consul-resolver可以将注册中心的服务拉取到本地然后安装负载均衡的算法进行负载均衡
  • documents
    • grpc负载均衡配置信息: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
    • grpc-consul-resolver: https://github.com/mbobakov/grpc-consul-resolver
package main

import (
	"context"
	"fmt"
	"log"
	"mxshop-api/test/grpclb_test/proto"

	_ "github.com/mbobakov/grpc-consul-resolver" // It's important, 里面有一个init方法会执行

	"google.golang.org/grpc"
)

func main() {
	conn, err := grpc.Dial(
		"consul://127.0.0.1:8500/user-srv?wait=14s&tag=srv",
		grpc.WithInsecure(),
    // 配置均衡策略
		grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), 
	)
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	for i := 0; i < 10; i++ {
		userSrvClient := proto.NewUserClient(conn)
		rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{
			Pn:    1, 
			PSize: 2,
		})
		if err != nil {
			panic(err)
		}
		for index, data := range rsp.Data {
			fmt.Println(index, data)
		}
	}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

# gRPC 使用 etcd

etcd ClientV3的使用:https://github.com/helios741/myblog/blob/new/learn_go/src/2020/0308_etcd_go_client/README.md

# server端

  • main.go

    	// 注册服务到etcd
    	etcdx.RegisterETCDServer(serverAddr)
    
    1
    2
  • register.go

    package etcdx
    
    import (
    	"context"
    	"fmt"
    	clientv3 "go.etcd.io/etcd/client/v3"
    	"sync/atomic"
    	"time"
    )
    
    var ETCDSchema = "ns"
    var ServiceName = "say_hello_servers"
    
    var ETCDServerPrefix = "/" + ETCDSchema + "/" + ServiceName + "/" // ETCD注册key前缀
    
    var GClient *clientv3.Client // ETCD客户端
    
    var IsCancelRegister int32
    var UnRegisterChan chan bool
    
    func init() {
    	IsCancelRegister = 0
    	UnRegisterChan = make(chan bool)
    }
    
    func RegisterETCDServer(addr string) {
    	// 服务注册
    	registerServer(addr)
    }
    
    func registerServer(addr string) {
    	var err error
    
    	// 创建ETCD的客户端
    	if GClient == nil {
    		GClient, err = newETCDClient()
    		if err != nil {
    			fmt.Println("ectd 客户端创建失败 error=", err.Error())
    			return
    		}
    	}
    	fmt.Println("ectd 客户端创建成功")
    
    	// 定时循环检测,查看向ETCD注册服务是否正常
    	// 每台服务向ETCD注册自己的IP地址,定时检测注册内容是否还在
    	ticker := time.NewTicker(time.Second * time.Duration(5))
    	go func() {
    		defer func() {
    			UnRegisterChan <- true
    		}()
    		for {
    			getResp, err := GClient.Get(context.Background(), ETCDServerPrefix+addr)
    			fmt.Println(getResp.Kvs)
    			fmt.Println(getResp.Count)
    			if err != nil {
    				fmt.Println("etcd出现异常,key获取异常,key=", ETCDServerPrefix+addr, " error=", err.Error())
    			} else if getResp.Count == 0 {
    				fmt.Println("etcd没有目标数据,需要补数据,key=", ETCDServerPrefix+addr)
    				go func() {
    					putData(ETCDServerPrefix+addr, addr)
    				}()
    			} else {
    				fmt.Println("etcd目标数据正常,key=", ETCDServerPrefix+addr)
    			}
    
    			<-ticker.C
    
    			if atomic.LoadInt32(&IsCancelRegister) > 0 {
    				fmt.Println("IsCancelRegister")
    				break
    			}
    		}
    	}()
    
    	return
    }
    
    func newETCDClient() (*clientv3.Client, error) {
    	config := clientv3.Config{
    		Endpoints:   []string{"127.0.0.1:2379"},
    		DialTimeout: 5 * time.Second,
    	}
    
    	return clientv3.New(config)
    }
    
    func putData(key, value string) {
    	leaseResp, err := GClient.Grant(context.Background(), 5)
    	if err != nil {
    		fmt.Println("etcd申请租约失败 key=", key, " error=", err.Error())
    		return
    	}
    
    	defer func() {
    		revokeResp, err := GClient.Revoke(context.Background(), leaseResp.ID)
    		fmt.Println("服务取消注册后,删除租约", revokeResp.Header, err)
    	}()
    
    	_, err = GClient.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID))
    	if err != nil {
    		fmt.Println("etcd写入数据失败 key=", key, " error=", err.Error())
    		return
    	}
    
    	kaRespChan, err := GClient.KeepAlive(context.Background(), leaseResp.ID)
    	if err != nil {
    		fmt.Println("etcd租约续约失败 key=", key, "id=", leaseResp.ID, " error=", err.Error())
    		return
    	}
    
    	// 定期查看续约结果
    	for {
    		select {
    		case respData := <-kaRespChan:
    			if kaRespChan == nil {
    				fmt.Println("管道关闭,出现异常,退出 key=", key)
    				return
    			} else {
    				if respData == nil {
    					fmt.Println("没有数据,可能是etcd关闭、也可能是网络异常,退出 key=", key)
    					return
    				} else {
    					fmt.Println("续约成功 key=", key)
    				}
    			}
    		}
    
    		time.Sleep(1 * time.Second)
    
    		if atomic.LoadInt32(&IsCancelRegister) > 0 {
    			break
    		}
    	}
    
    	return
    }
    
    func UnRegisterETCDServer(addr string) {
    	atomic.StoreInt32(&IsCancelRegister, 1)
    	<-UnRegisterChan
    
    	// 服务取消注册
    	unRegisterServer(addr)
    }
    
    func unRegisterServer(addr string) {
    	var err error
    
    	// 创建ETCD的客户端
    	if GClient == nil {
    		GClient, err = newETCDClient()
    		if err != nil {
    			fmt.Println("ectd 客户端创建失败 error=", err.Error())
    			return
    		}
    	}
    
    	// 删除服务注册数据
    	_, err = GClient.Delete(context.Background(), ETCDServerPrefix+addr)
    	if err != nil {
    		fmt.Println("服务关闭,etcd删除数据失败 key=", ETCDServerPrefix+addr, " error=", err.Error())
    		return
    	} else {
    		fmt.Println("服务关闭,etcd成功删除数据 key=", ETCDServerPrefix+addr)
    		return
    	}
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
  • resovel.go

    package etcdx
    
    import (
    	"context"
    	"fmt"
    	"go.etcd.io/etcd/api/v3/mvccpb"
    	clientv3 "go.etcd.io/etcd/client/v3"
    	"google.golang.org/grpc/resolver"
    	"strings"
    	"time"
    )
    
    /*****************************************************************************************************
    	Builder 是接口类型,用于创建命名解析器,可监视命名空间是否发生变化,其方法有:
    	1) Scheme() string		// 返回解析器支持的方案
    	2) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)	// 创建解析器
    
    	Resolver 是接口类型,用于监控目标变化,当目标发生变化时,会相应地更新地址、服务配置,其方法有:
    	1) Close()		// 关闭解析器
    	2) ResolveNow(ResolveNowOptions)		// 备用接口,GRPC可以再次调用用于目标的解析
    
    	客户端要实现以上接口,从而实现服务发现、变更
    *****************************************************************************************************/
    
    func NewResolver() resolver.Builder {
    	return &ETCDResolver{rawAddr: "127.0.0.1:2379"}
    }
    
    type ETCDResolver struct {
    	rawAddr      string              // etcd服务地址,多个地址要使用分隔符
    	resolverConn resolver.ClientConn // 解析器链接对象
    }
    
    // 实现Builder接口类型
    func (er *ETCDResolver) Scheme() string {
    	return ETCDSchema
    }
    
    func (er *ETCDResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    	// 构建解析器,解析器只负责对目标的更新,而对目标的监控由用户部分完成,
    	var err error
    	if GClient == nil {
    		GClient, err = clientv3.New(clientv3.Config{
    			Endpoints:   strings.Split(er.rawAddr, ";"),
    			DialTimeout: 5 * time.Second,
    		})
    
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	// 解析器监控变化
    	er.resolverConn = cc
    	fmt.Println("resolver create success")
    	go er.watch("/" + target.Scheme + "/" + target.Endpoint() + "/")
    
    	return er, nil
    }
    
    func (er *ETCDResolver) watch(keyPrefix string) {
    	for {
    		er.watchETCD(keyPrefix)
    		time.Sleep(1 * time.Second)
    	}
    }
    
    func (er *ETCDResolver) watchETCD(keyPrefix string) {
    	defer func() {
    		if err := recover(); err != nil {
    			fmt.Println("watch error =", err)
    		}
    	}()
    
    	er.watchETCDKey(keyPrefix)
    }
    
    func (er *ETCDResolver) watchETCDKey(keyPrefix string) {
    	var addrList []resolver.Address
    
    	// 读取ETCD,获取IP列表
    	getResp, err := GClient.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    	if err != nil {
    		fmt.Println("解析器读取ETCD,获取IP列表失败 err=", err.Error())
    	} else {
    		for index := range getResp.Kvs {
    			fmt.Println("初始IP地址是:", strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix))
    			addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)})
    		}
    	}
    
    	er.resolverConn.NewAddress(addrList)
    	// er.resolverConn.UpdateState(resolver.State{Addresses:addrList})
    
    	// 监控ETCD中目标数据的变化
    	watchChan := GClient.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    	for chanEle := range watchChan {
    		for _, ev := range chanEle.Events {
    			// 根据IP变化情况,解析器更新IP地址列表
    			addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
    			switch ev.Type {
    			case mvccpb.PUT:
    				if !exist(addrList, addr) {
    					addrList = append(addrList, resolver.Address{Addr: addr})
    					er.resolverConn.NewAddress(addrList)
    					fmt.Println("插入新地址 address=", addr)
    				}
    			case mvccpb.DELETE:
    				if s, ok := remove(addrList, addr); ok {
    					addrList = s
    					er.resolverConn.NewAddress(addrList)
    					fmt.Println("删除老地址 address=", addr)
    				}
    			}
    		}
    	}
    }
    
    func exist(l []resolver.Address, addr string) bool {
    	for i := range l {
    		if l[i].Addr == addr {
    			return true
    		}
    	}
    
    	return false
    }
    
    func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    	for i := range s {
    		if s[i].Addr == addr {
    			s[i] = s[len(s)-1]
    			return s[:len(s)-1], true
    		}
    	}
    
    	return nil, false
    }
    
    // 实现Resolver接口类型
    func (er *ETCDResolver) ResolveNow(rn resolver.ResolveNowOptions) {
    	fmt.Println("ETCDResolver ResolveNow")
    }
    
    func (er *ETCDResolver) Close() {
    	fmt.Println("ETCDResolver Close")
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148

# client端

  • client.go

    // 	"google.golang.org/grpc/resolver"
    
    	// 创建命名解析
    	r := etcdx.NewResolver()
    	resolver.Register(r)
    
    	conn, err := grpc.Dial(
    		r.Scheme()+"://author/"+etcdx.ServiceName,
    		//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // 已弃用
    		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"round_robin":{}}]}`), // 推荐使用
     // 推荐使用  配置地址:https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
    		grpc.WithInsecure(),
    	)
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    	if err != nil {
    		fmt.Println(err)
    		return
    	}
    	defer conn.Close()
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22

# 四层代理和七层代理

  • 代理分为:L3/L4(传输层)代理或者 L7(应用层)代理。

    • 在传输层的代理中,服务器终止 TCP 连接,然后在后端列表中再选一个。
    • 应用层(HTTP/2 和 gRPC 桢)只需要简单在客户端连接和后端连接之间复制即可。
  • 相比 L7,L3/L4 做的事情少,延迟短,消耗更少的资源。在 L7 中,LB 终止并解析 HTTP/2 协议。LB 可以根据请求内容分配后端,比如根据头部的 Cookie 值与特定的后端关联, 因此同一个会话的所有请求全部会转发给同一个后端。一旦 LB 选择了一个后端,它会创建一个新的 HTTP/2 连接,然后把客户端收到的 HTTP/2 流转发到所选的后端。使用 HTTP/2 LB 可以在多个后端之间分配来自同一个客户端的流。也就是说,L7 是会同时建立一个与客户端和服务器的流,然后做请求,响应复制转发。

  • 如何选择 L3/L4 还是 L7

  • 需求 推荐
    RPC 在大量连接中负载变化很大 L7
    存储或者计算亲和力很重要 L7,并使用 cookie 或者类似的东西进行后端请求矫正
    最大限度的减少代理中的资源利用率(比功能更重要) L3/L4
    低延迟容忍 L3/L4

# 最佳实践

  1. 场景 1

    • 客户端和服务器之间流量非常大

    • 客户端可以被信赖

    • 推荐:

      • 客户端侧(重)的负载均衡

      • 带 ZooKeeper/Etcd/Consul/Eureka 的客户端

  2. 场景 2

    • 传统逻辑 - 许多客户端连接到代理后面的服务

    • 客户端和服务器之间需要信任边界

    • 推荐:

      • 负载均衡代理

      • L3/L4 LB + GCLB(如果使用 GCP 的话)

      • L3/L4 LB + haproxy - 配置文件 (opens new window)

      • nginx 1.13.10 已经支持了

      • 如果需要会话粘滞性 - 使用 Envoy 代理的 L7 LB

  3. 场景 3

    • 微服务架构 - 数据中心中有 N 个 客户端,M 个服务器

    • 极高的性能要求(低延迟、高流量)

    • 客户端不受信任

    • 推荐:

      • 后备负载均衡

      • 使用 gRPC-LB 协议的客户端 LB

  4. 场景 4

    • 已存在服务网格架构,使用 Linkerd 或者 Istio

    • 推荐:

      • Service Mesh

      • 使用 Istio 或者 Envoy 内置的 LB

#grpc
上次更新: 2023/08/27, 21:33:49
【grpc】 6.中间件&拦截器
【grpc】 8.身份认证

← 【grpc】 6.中间件&拦截器 【grpc】 8.身份认证→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式