【grpc】 7.负载均衡
# 常用负载均衡方式
- 代理模式
- 客户端负载均衡
- 额外负载均衡服务
- 代理模式
- 在客户端和服务器之间提供一层服务转发代理,同等协议的转发,比如 HTTP 请求转发;
- 代理需要拥有 rpc 的请求和响应临时副本,会消耗更多的资源
- 代理模式增加 rpc 的延迟,在代理大量的服务(比如存储),会造成任务效率低下
- 客户端负载均衡
- 把负载均衡的逻辑放在客户端中。
- 客户端自己实现负载均衡策略(比如:轮询,随机分发等)来选择一个后端服务。在这种情况下,客户端通过 name resolution 系统 中拉取服务器列表。
- 这种方法的缺点之一是要书写多语言,多版本的负载均衡器和维护。这些策略比较复杂。一些算法需要服务器和客户端通信来除了满足用户需要请求的 RPC 调用之外, 还需要额外的支持 RPC 以获得后端服务的运行状态和加载信息等。总而言之复杂度提升。
- 胖客户端的方式通常是不推荐的,因为这样会导致客户端变得复杂。尤其在跨团队协作中,客户端代码的统一维护会成为挑战。
- 额外负载均衡服务
- 客户端向负载均衡服务发出请求,负载均衡服务负责维护服务器列表的维护,以及实现各种复杂的负载均衡策略,而且通过健康检测和服务器的负载来合理的处理服务器可用性。
- 基于底层的网络协议转发,以节省资源浪费。
- 例子是 nginx,kubernetes 中的 services,或者 service mash 中的 sidecar。
# 负载均衡算法
轮询法
- 轮询法,很好理解,将请求按照顺序轮流的分配到服务器上,他均衡的对待每一台后端的服务器, 不关心服务器的的连接数和负载情况.以下代码演示了这种算法.
随机法
- 通过系统的随机函数,根据后端服务器列表的大小来随机获取其中的一台来访问,随着调用量的增大, 实际效果越来越近似于平均分配到没一台服务器.和轮询的效果类似.
源地址hash法
- 源地址hash法的思想是获取客户端访问的ip地址,通过hash函数计算出一个hash值,用该hash值对服 务器列表的大小进行取模运算,得到的值就是要访问的服务器的序号.
加权轮询法
- 刚刚有说道过,不同的服务器性能不同,所以不能一概而论,需要给性能低的服务器给比较低的 权重,性能高的给跟高的权重.
加权随机法
- 加权随机法算法和加权轮询法类似
最小连接法
- 前面我们费尽心思来实现服务消费者请求次数分配的均衡, 我们知道这样做是没错的,可以 为后端的多台服务器平均分配工作量,最大程度地提高服务器的利用率,但是,实际上,请求次 数的均衡并不代表负载的均衡。因此我们需要介绍最小连接数法,最小连接数法比较灵活和智 能,由于后台服务器的配置不尽相同,对请求的处理有快有慢,它正是根据后端服务器当前的连 接情况,动态的选取其中当前积压连接数最少的一台服务器来处理当前请求,尽可能的提高后台 服务器利用率,将负载合理的分流到每一台服务器。
# gRPC负载均衡
官方接口:https://pkg.go.dev/google.golang.org/grpc/balancer
客户端配置地址:https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
开始 gRPC client 会发起一个服务器名称解析请求。服务器名称会被解析为若干个 IP地址,每个ip会表明自己是一个服务地址或是负载均衡器地址,同时表明,服务配置 (opens new window)中希望客户端使用哪种负载均衡策略(round_bin, grpclb)
客户端实例化负载均衡策略。
- 注意: 任何被解析服务返回的地址如果是一个负载均衡地址,client 将使用 grpclb 策略,无论服务配置中要求使用哪种负载均衡策略。其它的将使用服务配置中要求的策略。如果没有策略,客户端选择第一个可用的服务。
负载均衡策略为每个服务创建一个子通道【subchannel】。
除了 grpclb,策略解析器会为每个地址创建一个子通道。
grpclb 的工作流如下:
- a. grpclb会在 resolver返回的负载均衡器地址上打开一个流连接。客户端会从这个流中,根据名称获取到需要的服务器地址。
注意: 在grpclb 策略下,非负载均衡器地址会以回调的方式使用,以防LB策略启动的时候, 没有均衡器可连接。
- b. 如果负载均衡器配置需要知道服务的负载情况, 则服务器会上报该负载
- c. 负载均衡器会返回一个服务器列表给gRPC客户端的 grpcLB策略。grpclb会为每个服务创建一个 子通道
对于每一个 RPC 发送,负载均衡策略决定将它发送到哪个子通道去
grpclb策略的场景下,客户端到服务器的请求是以他们被负载均衡器返回的顺序发送的。如果服务器列表为空,请求将阻塞到有一个非空的服务返回为止。
# gRPC 使用 console
- 使用
grpc-consul-resolver
实现consul服务的负载均衡 - grpc-consul-resolver可以将注册中心的服务拉取到本地然后安装负载均衡的算法进行负载均衡
- documents
- grpc负载均衡配置信息: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
- grpc-consul-resolver: https://github.com/mbobakov/grpc-consul-resolver
package main
import (
"context"
"fmt"
"log"
"mxshop-api/test/grpclb_test/proto"
_ "github.com/mbobakov/grpc-consul-resolver" // It's important, 里面有一个init方法会执行
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial(
"consul://127.0.0.1:8500/user-srv?wait=14s&tag=srv",
grpc.WithInsecure(),
// 配置均衡策略
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
for i := 0; i < 10; i++ {
userSrvClient := proto.NewUserClient(conn)
rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{
Pn: 1,
PSize: 2,
})
if err != nil {
panic(err)
}
for index, data := range rsp.Data {
fmt.Println(index, data)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# gRPC 使用 etcd
etcd ClientV3的使用:https://github.com/helios741/myblog/blob/new/learn_go/src/2020/0308_etcd_go_client/README.md
# server端
main.go
// 注册服务到etcd etcdx.RegisterETCDServer(serverAddr)
1
2register.go
package etcdx import ( "context" "fmt" clientv3 "go.etcd.io/etcd/client/v3" "sync/atomic" "time" ) var ETCDSchema = "ns" var ServiceName = "say_hello_servers" var ETCDServerPrefix = "/" + ETCDSchema + "/" + ServiceName + "/" // ETCD注册key前缀 var GClient *clientv3.Client // ETCD客户端 var IsCancelRegister int32 var UnRegisterChan chan bool func init() { IsCancelRegister = 0 UnRegisterChan = make(chan bool) } func RegisterETCDServer(addr string) { // 服务注册 registerServer(addr) } func registerServer(addr string) { var err error // 创建ETCD的客户端 if GClient == nil { GClient, err = newETCDClient() if err != nil { fmt.Println("ectd 客户端创建失败 error=", err.Error()) return } } fmt.Println("ectd 客户端创建成功") // 定时循环检测,查看向ETCD注册服务是否正常 // 每台服务向ETCD注册自己的IP地址,定时检测注册内容是否还在 ticker := time.NewTicker(time.Second * time.Duration(5)) go func() { defer func() { UnRegisterChan <- true }() for { getResp, err := GClient.Get(context.Background(), ETCDServerPrefix+addr) fmt.Println(getResp.Kvs) fmt.Println(getResp.Count) if err != nil { fmt.Println("etcd出现异常,key获取异常,key=", ETCDServerPrefix+addr, " error=", err.Error()) } else if getResp.Count == 0 { fmt.Println("etcd没有目标数据,需要补数据,key=", ETCDServerPrefix+addr) go func() { putData(ETCDServerPrefix+addr, addr) }() } else { fmt.Println("etcd目标数据正常,key=", ETCDServerPrefix+addr) } <-ticker.C if atomic.LoadInt32(&IsCancelRegister) > 0 { fmt.Println("IsCancelRegister") break } } }() return } func newETCDClient() (*clientv3.Client, error) { config := clientv3.Config{ Endpoints: []string{"127.0.0.1:2379"}, DialTimeout: 5 * time.Second, } return clientv3.New(config) } func putData(key, value string) { leaseResp, err := GClient.Grant(context.Background(), 5) if err != nil { fmt.Println("etcd申请租约失败 key=", key, " error=", err.Error()) return } defer func() { revokeResp, err := GClient.Revoke(context.Background(), leaseResp.ID) fmt.Println("服务取消注册后,删除租约", revokeResp.Header, err) }() _, err = GClient.Put(context.Background(), key, value, clientv3.WithLease(leaseResp.ID)) if err != nil { fmt.Println("etcd写入数据失败 key=", key, " error=", err.Error()) return } kaRespChan, err := GClient.KeepAlive(context.Background(), leaseResp.ID) if err != nil { fmt.Println("etcd租约续约失败 key=", key, "id=", leaseResp.ID, " error=", err.Error()) return } // 定期查看续约结果 for { select { case respData := <-kaRespChan: if kaRespChan == nil { fmt.Println("管道关闭,出现异常,退出 key=", key) return } else { if respData == nil { fmt.Println("没有数据,可能是etcd关闭、也可能是网络异常,退出 key=", key) return } else { fmt.Println("续约成功 key=", key) } } } time.Sleep(1 * time.Second) if atomic.LoadInt32(&IsCancelRegister) > 0 { break } } return } func UnRegisterETCDServer(addr string) { atomic.StoreInt32(&IsCancelRegister, 1) <-UnRegisterChan // 服务取消注册 unRegisterServer(addr) } func unRegisterServer(addr string) { var err error // 创建ETCD的客户端 if GClient == nil { GClient, err = newETCDClient() if err != nil { fmt.Println("ectd 客户端创建失败 error=", err.Error()) return } } // 删除服务注册数据 _, err = GClient.Delete(context.Background(), ETCDServerPrefix+addr) if err != nil { fmt.Println("服务关闭,etcd删除数据失败 key=", ETCDServerPrefix+addr, " error=", err.Error()) return } else { fmt.Println("服务关闭,etcd成功删除数据 key=", ETCDServerPrefix+addr) return } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168resovel.go
package etcdx import ( "context" "fmt" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "google.golang.org/grpc/resolver" "strings" "time" ) /***************************************************************************************************** Builder 是接口类型,用于创建命名解析器,可监视命名空间是否发生变化,其方法有: 1) Scheme() string // 返回解析器支持的方案 2) Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error) // 创建解析器 Resolver 是接口类型,用于监控目标变化,当目标发生变化时,会相应地更新地址、服务配置,其方法有: 1) Close() // 关闭解析器 2) ResolveNow(ResolveNowOptions) // 备用接口,GRPC可以再次调用用于目标的解析 客户端要实现以上接口,从而实现服务发现、变更 *****************************************************************************************************/ func NewResolver() resolver.Builder { return &ETCDResolver{rawAddr: "127.0.0.1:2379"} } type ETCDResolver struct { rawAddr string // etcd服务地址,多个地址要使用分隔符 resolverConn resolver.ClientConn // 解析器链接对象 } // 实现Builder接口类型 func (er *ETCDResolver) Scheme() string { return ETCDSchema } func (er *ETCDResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { // 构建解析器,解析器只负责对目标的更新,而对目标的监控由用户部分完成, var err error if GClient == nil { GClient, err = clientv3.New(clientv3.Config{ Endpoints: strings.Split(er.rawAddr, ";"), DialTimeout: 5 * time.Second, }) if err != nil { return nil, err } } // 解析器监控变化 er.resolverConn = cc fmt.Println("resolver create success") go er.watch("/" + target.Scheme + "/" + target.Endpoint() + "/") return er, nil } func (er *ETCDResolver) watch(keyPrefix string) { for { er.watchETCD(keyPrefix) time.Sleep(1 * time.Second) } } func (er *ETCDResolver) watchETCD(keyPrefix string) { defer func() { if err := recover(); err != nil { fmt.Println("watch error =", err) } }() er.watchETCDKey(keyPrefix) } func (er *ETCDResolver) watchETCDKey(keyPrefix string) { var addrList []resolver.Address // 读取ETCD,获取IP列表 getResp, err := GClient.Get(context.Background(), keyPrefix, clientv3.WithPrefix()) if err != nil { fmt.Println("解析器读取ETCD,获取IP列表失败 err=", err.Error()) } else { for index := range getResp.Kvs { fmt.Println("初始IP地址是:", strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)) addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[index].Key), keyPrefix)}) } } er.resolverConn.NewAddress(addrList) // er.resolverConn.UpdateState(resolver.State{Addresses:addrList}) // 监控ETCD中目标数据的变化 watchChan := GClient.Watch(context.Background(), keyPrefix, clientv3.WithPrefix()) for chanEle := range watchChan { for _, ev := range chanEle.Events { // 根据IP变化情况,解析器更新IP地址列表 addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix) switch ev.Type { case mvccpb.PUT: if !exist(addrList, addr) { addrList = append(addrList, resolver.Address{Addr: addr}) er.resolverConn.NewAddress(addrList) fmt.Println("插入新地址 address=", addr) } case mvccpb.DELETE: if s, ok := remove(addrList, addr); ok { addrList = s er.resolverConn.NewAddress(addrList) fmt.Println("删除老地址 address=", addr) } } } } } func exist(l []resolver.Address, addr string) bool { for i := range l { if l[i].Addr == addr { return true } } return false } func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) { for i := range s { if s[i].Addr == addr { s[i] = s[len(s)-1] return s[:len(s)-1], true } } return nil, false } // 实现Resolver接口类型 func (er *ETCDResolver) ResolveNow(rn resolver.ResolveNowOptions) { fmt.Println("ETCDResolver ResolveNow") } func (er *ETCDResolver) Close() { fmt.Println("ETCDResolver Close") }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# client端
client.go
// "google.golang.org/grpc/resolver" // 创建命名解析 r := etcdx.NewResolver() resolver.Register(r) conn, err := grpc.Dial( r.Scheme()+"://author/"+etcdx.ServiceName, //grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`), // 已弃用 grpc.WithDefaultServiceConfig(`{"loadBalancingConfig":[{"round_robin":{}}]}`), // 推荐使用 // 推荐使用 配置地址:https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto grpc.WithInsecure(), ) if err != nil { fmt.Println(err) return } if err != nil { fmt.Println(err) return } defer conn.Close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 四层代理和七层代理
代理分为:L3/L4(传输层)代理或者 L7(应用层)代理。
- 在传输层的代理中,服务器终止 TCP 连接,然后在后端列表中再选一个。
- 应用层(HTTP/2 和 gRPC 桢)只需要简单在客户端连接和后端连接之间复制即可。
相比 L7,L3/L4 做的事情少,延迟短,消耗更少的资源。在 L7 中,LB 终止并解析 HTTP/2 协议。LB 可以根据请求内容分配后端,比如根据头部的 Cookie 值与特定的后端关联, 因此同一个会话的所有请求全部会转发给同一个后端。一旦 LB 选择了一个后端,它会创建一个新的 HTTP/2 连接,然后把客户端收到的 HTTP/2 流转发到所选的后端。使用 HTTP/2 LB 可以在多个后端之间分配来自同一个客户端的流。也就是说,L7 是会同时建立一个与客户端和服务器的流,然后做请求,响应复制转发。
如何选择 L3/L4 还是 L7
需求 推荐 RPC 在大量连接中负载变化很大 L7 存储或者计算亲和力很重要 L7,并使用 cookie 或者类似的东西进行后端请求矫正 最大限度的减少代理中的资源利用率(比功能更重要) L3/L4 低延迟容忍 L3/L4
# 最佳实践
场景 1
客户端和服务器之间流量非常大
客户端可以被信赖
推荐:
客户端侧(重)的负载均衡
带 ZooKeeper/Etcd/Consul/Eureka 的客户端
场景 2
传统逻辑 - 许多客户端连接到代理后面的服务
客户端和服务器之间需要信任边界
推荐:
负载均衡代理
L3/L4 LB + GCLB(如果使用 GCP 的话)
L3/L4 LB + haproxy - 配置文件 (opens new window)
nginx 1.13.10 已经支持了
如果需要会话粘滞性 - 使用 Envoy 代理的 L7 LB
场景 3
微服务架构 - 数据中心中有 N 个 客户端,M 个服务器
极高的性能要求(低延迟、高流量)
客户端不受信任
推荐:
后备负载均衡
使用 gRPC-LB 协议的客户端 LB
场景 4
已存在服务网格架构,使用 Linkerd 或者 Istio
推荐:
Service Mesh
使用 Istio 或者 Envoy 内置的 LB