etcd分布式锁
# 实现思路
- etcd的几种特殊的机制都可以作为分布式锁的基础。etcd的键值对可以作为锁的本体,锁的创建与删除对应键值对的创建与删除。etcd的分布式一致性以及高可用可以保证锁的高可用性。
# 1. prefix
- 由于etcd支持前缀查找,可以将锁设置成“锁名称”+“唯一id”的格式,保证锁的对称性,即每个客户端只操作自己持有的锁。
# 2. lease
- 租约机制可以为锁做一个保活操作,在创建锁的时候绑定租约,并定期进行续约,如果获得锁期间客户端意外宕机,则持有的锁会被自动删除,避免了死锁的产生。
# 3. Revision
- etcd内部维护了一个全局的Revision值,并会随着事务的递增而递增。可以用Revision值的大小来决定获取锁的先后顺序,在上锁的时候已经决定了获取锁先后顺序,后续有客户端释放锁也不会产生惊群效应。
# 4. watch
- watch机制可以用于监听锁的删除事件,不必使用忙轮询的方式查看是否释放了锁,更加高效。同时,在watch时候可以通过Revision来进行监听,只需要监听距离自己最近而且比自己小的一个Revision就可以做到锁的实时获取。
# 代码示例
func main() {
//初始化etcd客户端
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:23790"},
DialTimeout: time.Second,
})
//创建一个session,并根据业务情况设置锁的ttl
s, _ := concurrency.NewSession(cli, concurrency.WithTTL(3))
defer s.Close()
//初始化一个锁的实例,并进行加锁解锁操作。
mu := concurrency.NewMutex(s, "mutex-linugo")
if err := mu.Lock(context.TODO()); err != nil {
log.Fatal("m lock err: ", err)
}
//do something
if err := mu.Unlock(context.TODO()); err != nil {
log.Fatal("m unlock err: ", err)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 源码分析
# 1. NewSession
在调用NewSession方法时候实际上是初始化了一个用户指定行为的租约(行为可以是指定ttl,或者复用其他的lease等),并异步进行keepalive。
type Mutex struct { s *Session //保存的租约相关的信息 pfx string //锁的名称,key的前缀 myKey string //锁完整的key = pfx/sessionID myRev int64 //自己的版本号 hdr *pb.ResponseHeader } func NewMutex(s *Session, pfx string) *Mutex { return &Mutex{s, pfx + "/", "", -1, nil} }
1
2
3
4
5
6
7
8
9
10
11
12
# 2. NewMutex
NewMutex
实际上创建了一个锁的数据结构,该结构可以保存一些锁的信息,入参的“mutex-linugo”只是一个key的前缀,还有后续要创建的完整key,revision等信息。
# 3. Lock
Lock
func (m *Mutex) Lock(ctx context.Context) error { //首先尝试获取锁 resp, err := m.tryAcquire(ctx) if err != nil { return err } ...... } func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { s := m.s client := m.s.Client() // 完整key是前缀名称加租约ID,由于不同进程生成的不同租约,所以锁互不相同 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) // cmp通过比较createRevision是否为0判断当前的key是不是第一次创建 cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put会把key绑定上租约并存储,同样prefix代表锁的是同一个资源。自动续约 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) //get会获取当前key的相关信息 get := v3.OpGet(m.myKey) //getOwner是通过前缀来进行范围查找,WithFirstCreate() 筛选出当前存在的最小revision对应的值 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() if err != nil { return nil, err } // 将该事务的 revision 赋值到锁的myRev字段 m.myRev = resp.Header.Revision if !resp.Succeeded { m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision } return resp, nil }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33在获取锁的时候,通过事务操作来尝试加锁。
如果当前的key是第一次创建,则将key绑定租约并存储,否则获取当前的key详细信息。
getOwner通过前缀来进行查找最小revision对应的值,目的是获取当前锁的持有者(如果最小Revison的key释放锁,则该key会被删除,所以最小Revision的key就是当前锁的持有者)。
!resp.Succeeded代表key不是第一次创建,则之前执行的是get,获取该key创建时候的revision并赋值到锁的myRev字段。
回到主函数,目前etcd中已经存有锁相关键值对信息了,后面会通过比较Revision来判断自己获得了锁还是需要等待锁,如果自己的myRev与ownerKey的Revsion相同,说明自己就是锁的持有者。
Lock
func (m *Mutex) Lock(ctx context.Context) error { resp, err := m.tryAcquire(ctx) if err != nil { return err } //ownerKey就是当前持有锁的值 ownerKey := resp.Responses[1].GetResponseRange().Kvs //如果ownerKey的长度为0或者持有者的Revision与自己的Revision相同, //说明自己持有锁,可以直接返回,并对共享资源进行操作 if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } ...... //等待锁的释放 client := m.s.Client() _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) if werr != nil { m.Unlock(client.Ctx()) return werr } //确保session没有过期 gresp, werr := client.Get(ctx, m.myKey) if werr != nil { m.Unlock(client.Ctx()) return werr } if len(gresp.Kvs) == 0 { return ErrSessionExpired } m.hdr = gresp.Header return nil }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 3.1 waitDeletes
如果没有获得锁,就需要等待前面锁的释放,这里主要用到watch机制来监听,getOpts行为参数用于获取前一个锁的信息。
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) { //getOpts会通过两个Option函数获取小于传入的maxCreateRev的Revision的key集合且找出集合中最大的Revison对应的key //主要是用于找到前一个上锁的key,进而可以watch该key的删除事件 getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev)) for { //get通过getOpts的动作来获取键值对 resp, err := client.Get(ctx, pfx, getOpts...) if err != nil { return nil, err } //如果长度是0,说明key被删除,前面的锁已经被释放了,可以直接返回 if len(resp.Kvs) == 0 { return resp.Header, nil } lastKey := string(resp.Kvs[0].Key) //否则通过watch监听上一个锁的删除事件 if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil { return nil, err } } } func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error { cctx, cancel := context.WithCancel(ctx) defer cancel() var wr v3.WatchResponse //通过Revsion来watch key,也就是前一个锁 wch := client.Watch(cctx, key, v3.WithRev(rev)) for wr = range wch { for _, ev := range wr.Events { //监听Delete事件 if ev.Type == mvccpb.DELETE { return nil } } } if err := wr.Err(); err != nil { return err } if err := ctx.Err(); err != nil { return err } return fmt.Errorf("lost watcher waiting for delete") }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45waitDeletes正常返回后该进程会获得锁,进入操作共享资源。
# 4. UnLock
解锁操作会直接删除对应的kv,这会触发下一个锁的获取。
func (m *Mutex) Unlock(ctx context.Context) error { client := m.s.Client() if _, err := client.Delete(ctx, m.myKey); err != nil { return err } m.myKey = "\x00" m.myRev = -1 return nil }
1
2
3
4
5
6
7
8
9
上次更新: 2023/04/16, 18:35:33