singleflight实现原理
SingleFlight
的作用是在处理多个goroutine
同时调用同一个函数的时候,只让一个goroutine
去实际调用这个函数,等到这个goroutine
返回结果的时候,再把结果返回给其他几个同时调用了相同函数的goroutine
,这样可以减少并发调用的数量。
# 应用场景
singleflight 包主要是用来做并发控制
防止 缓存击穿: 缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。
# 使用方法
singleflight 接口
//Do方法,传入key,以及回调函数,如果key相同,fn方法只会执行一次,同步等待 //返回值v:表示fn执行结果 //返回值err:表示fn的返回的err //返回值shared:表示是否是真实fn返回的还是从保存的map[key]返回的,也就是共享的 func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { //DoChan方法类似Do方法,只是返回的是一个chan func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { //设计Forget 控制key关联的值是否失效,默认以上两个方法只要fn方法执行完成后,内部维护的fn的值也删除(即并发结束后就失效了) func (g *Group) Forget(key string)
1
2
3
4
5
6
7
8
9demo
package main import ( "golang.org/x/sync/singleflight" "log" "time" ) func main() { var singleSetCache singleflight.Group getAndSetCache := func(requestID int, cacheKey string) (string, error) { log.Printf("request %v start to get and set cache...", requestID) value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) { //do 的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB log.Printf("request %v is setting cache...", requestID) time.Sleep(3 * time.Second) log.Printf("request %v set cache success!", requestID) return "VALUE", nil }) return value.(string), nil } cacheKey := "cacheKey" for i := 1; i < 10; i++ { go func(requestID int) { value, _ := getAndSetCache(requestID, cacheKey) log.Printf("request %v get value: %v", requestID, value) }(i) } time.Sleep(20 * time.Second) } /* 输出 2022/07/08 17:43:45 request 4 start to get and set cache... 2022/07/08 17:43:45 request 4 is setting cache... 2022/07/08 17:43:45 request 1 start to get and set cache... 2022/07/08 17:43:45 request 3 start to get and set cache... 2022/07/08 17:43:45 request 9 start to get and set cache... 2022/07/08 17:43:45 request 6 start to get and set cache... 2022/07/08 17:43:45 request 5 start to get and set cache... 2022/07/08 17:43:45 request 7 start to get and set cache... 2022/07/08 17:43:45 request 8 start to get and set cache... 2022/07/08 17:43:45 request 2 start to get and set cache... 2022/07/08 17:43:48 request 4 set cache success! 2022/07/08 17:43:48 request 4 get value: VALUE 2022/07/08 17:43:48 request 2 get value: VALUE 2022/07/08 17:43:48 request 5 get value: VALUE 2022/07/08 17:43:48 request 7 get value: VALUE 2022/07/08 17:43:48 request 8 get value: VALUE 2022/07/08 17:43:48 request 6 get value: VALUE 2022/07/08 17:43:48 request 9 get value: VALUE 2022/07/08 17:43:48 request 1 get value: VALUE 2022/07/08 17:43:48 request 3 get value: VALUE */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# 实现原理
# 1. 核心数据结构
// group
type Group struct {
mu sync.Mutex
m map[string]*call
}
// call
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
chans []chan<- Result
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 2. 核心方法
SingleFlight
定义一个call
结构体,每个结构体都保存了fn
调用对应的信息。Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的
fn
函数调用信息的call
结构体。- 当不存在时,证明是这个Key的第一次请求,那么会初始化一个
call
结构体指针,增加SingleFlight
内部持有的sync.WaitGroup
计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn
函数的返回结果 - 当存在时,增加
call
结构体内代表fn
重复调用次数的计数器dups
,释放互斥锁,然后使用WaitGroup
等待fn
函数执行完成。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { // 存在取值返回 c.dups++ ... return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) // 不存在调用 docall return c.val, c.err, c.dups > 0 }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18- 当不存在时,证明是这个Key的第一次请求,那么会初始化一个
call
结构体的val
和err
两个字段只会在doCall
方法中执行fn
有返回结果后才赋值,所以当doCall
方法 和WaitGroup.Wait
返回时,函数调用的结果和错误会返回给Do方法的所有调用者。func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false defer func() { // the given function invoked runtime.Goexit if !normalReturn && !recovered { c.err = errGoexit } c.wg.Done() g.mu.Lock() defer g.mu.Unlock() if !c.forgotten { delete(g.m, key) } if e, ok := c.err.(*panicError); ok { ... } else { // Normal return for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { defer func() { if !normalReturn { if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() // 真正的执行调用函数 normalReturn = true }() if !normalReturn { recovered = true } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# 总结
- Group中存入map, 新到的请求按照key存入map, mutex保证并发安全
- 如果map中已经存在相同的key, 会wg.wait()等待doCall执行完毕
- doCall真正的调用请求方法,只会有一个请求真正执行,执行成功后wg.Done唤醒wg.wait()中的请求
- 全部返回最终的value.
上次更新: 2023/04/16, 18:35:33