刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • Mysql

  • Redis

  • elasticsearch

  • etcd

    • etcd 初识
    • etcd版本差异
    • Raft原理
    • Go操作etcd
    • etcd分布式锁
      • 实现思路
        • 1. prefix
        • 2. lease
        • 3. Revision
        • 4. watch
      • 代码示例
      • 源码分析
        • 1. NewSession
        • 2. NewMutex
        • 3. Lock
        • 3.1 waitDeletes
        • 4. UnLock
  • Database
  • etcd
bigox
2022-07-24
目录

etcd分布式锁

# 实现思路

  • etcd的几种特殊的机制都可以作为分布式锁的基础。etcd的键值对可以作为锁的本体,锁的创建与删除对应键值对的创建与删除。etcd的分布式一致性以及高可用可以保证锁的高可用性。

# 1. prefix

  • 由于etcd支持前缀查找,可以将锁设置成“锁名称”+“唯一id”的格式,保证锁的对称性,即每个客户端只操作自己持有的锁。

# 2. lease

  • 租约机制可以为锁做一个保活操作,在创建锁的时候绑定租约,并定期进行续约,如果获得锁期间客户端意外宕机,则持有的锁会被自动删除,避免了死锁的产生。

# 3. Revision

  • etcd内部维护了一个全局的Revision值,并会随着事务的递增而递增。可以用Revision值的大小来决定获取锁的先后顺序,在上锁的时候已经决定了获取锁先后顺序,后续有客户端释放锁也不会产生惊群效应。

# 4. watch

  • watch机制可以用于监听锁的删除事件,不必使用忙轮询的方式查看是否释放了锁,更加高效。同时,在watch时候可以通过Revision来进行监听,只需要监听距离自己最近而且比自己小的一个Revision就可以做到锁的实时获取。

# 代码示例

func main() {
    //初始化etcd客户端
    cli, _ := clientv3.New(clientv3.Config{
        Endpoints:   []string{"127.0.0.1:23790"},
        DialTimeout: time.Second,
    })
    //创建一个session,并根据业务情况设置锁的ttl
    s, _ := concurrency.NewSession(cli, concurrency.WithTTL(3))
    defer s.Close()
    //初始化一个锁的实例,并进行加锁解锁操作。
    mu := concurrency.NewMutex(s, "mutex-linugo")
    if err := mu.Lock(context.TODO()); err != nil {
        log.Fatal("m lock err: ", err)
    }
    //do something
    if err := mu.Unlock(context.TODO()); err != nil {
        log.Fatal("m unlock err: ", err)
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 源码分析

# 1. NewSession

  • 在调用NewSession方法时候实际上是初始化了一个用户指定行为的租约(行为可以是指定ttl,或者复用其他的lease等),并异步进行keepalive。

    type Mutex struct {
        s *Session //保存的租约相关的信息
    
        pfx   string //锁的名称,key的前缀
        myKey string //锁完整的key = pfx/sessionID
        myRev int64  //自己的版本号
        hdr   *pb.ResponseHeader
    }
    
    func NewMutex(s *Session, pfx string) *Mutex {
        return &Mutex{s, pfx + "/", "", -1, nil}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12

# 2. NewMutex

  • NewMutex实际上创建了一个锁的数据结构,该结构可以保存一些锁的信息,入参的“mutex-linugo”只是一个key的前缀,还有后续要创建的完整key,revision等信息。

# 3. Lock

  • Lock

    func (m *Mutex) Lock(ctx context.Context) error {
        //首先尝试获取锁
        resp, err := m.tryAcquire(ctx)
        if err != nil {
            return err
        }
        ......
    }
    
    func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
        s := m.s
        client := m.s.Client()
        // 完整key是前缀名称加租约ID,由于不同进程生成的不同租约,所以锁互不相同
        m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
        // cmp通过比较createRevision是否为0判断当前的key是不是第一次创建
        cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
        // put会把key绑定上租约并存储,同样prefix代表锁的是同一个资源。自动续约
        put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
        //get会获取当前key的相关信息
        get := v3.OpGet(m.myKey)
        //getOwner是通过前缀来进行范围查找,WithFirstCreate() 筛选出当前存在的最小revision对应的值
        getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
        resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
        if err != nil {
            return nil, err
        }
        // 将该事务的 revision 赋值到锁的myRev字段
        m.myRev = resp.Header.Revision
        if !resp.Succeeded {
            m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
        }
        return resp, nil
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
  • 在获取锁的时候,通过事务操作来尝试加锁。

    • 如果当前的key是第一次创建,则将key绑定租约并存储,否则获取当前的key详细信息。

    • getOwner通过前缀来进行查找最小revision对应的值,目的是获取当前锁的持有者(如果最小Revison的key释放锁,则该key会被删除,所以最小Revision的key就是当前锁的持有者)。

    • !resp.Succeeded代表key不是第一次创建,则之前执行的是get,获取该key创建时候的revision并赋值到锁的myRev字段。

  • 回到主函数,目前etcd中已经存有锁相关键值对信息了,后面会通过比较Revision来判断自己获得了锁还是需要等待锁,如果自己的myRev与ownerKey的Revsion相同,说明自己就是锁的持有者。

  • Lock

    func (m *Mutex) Lock(ctx context.Context) error {
        resp, err := m.tryAcquire(ctx)
        if err != nil {
            return err
        }
        //ownerKey就是当前持有锁的值
        ownerKey := resp.Responses[1].GetResponseRange().Kvs
        //如果ownerKey的长度为0或者持有者的Revision与自己的Revision相同,
        //说明自己持有锁,可以直接返回,并对共享资源进行操作
        if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
            m.hdr = resp.Header
            return nil
        }
        ......
        //等待锁的释放
        client := m.s.Client()
        _, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
        if werr != nil {
            m.Unlock(client.Ctx())
            return werr
        }
        //确保session没有过期
        gresp, werr := client.Get(ctx, m.myKey)
        if werr != nil {
            m.Unlock(client.Ctx())
            return werr
        }
    
        if len(gresp.Kvs) == 0 {
            return ErrSessionExpired
        }
        m.hdr = gresp.Header
    
        return nil
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35

# 3.1 waitDeletes

  • 如果没有获得锁,就需要等待前面锁的释放,这里主要用到watch机制来监听,getOpts行为参数用于获取前一个锁的信息。

    func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
        //getOpts会通过两个Option函数获取小于传入的maxCreateRev的Revision的key集合且找出集合中最大的Revison对应的key
        //主要是用于找到前一个上锁的key,进而可以watch该key的删除事件
        getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
        for {
            //get通过getOpts的动作来获取键值对
            resp, err := client.Get(ctx, pfx, getOpts...)
            if err != nil {
                return nil, err
            }
            //如果长度是0,说明key被删除,前面的锁已经被释放了,可以直接返回
            if len(resp.Kvs) == 0 {
                return resp.Header, nil
            }
            lastKey := string(resp.Kvs[0].Key)
            //否则通过watch监听上一个锁的删除事件
            if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
                return nil, err
            }
        }
    }
    
    func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
        cctx, cancel := context.WithCancel(ctx)
        defer cancel()
    
        var wr v3.WatchResponse
        //通过Revsion来watch key,也就是前一个锁
        wch := client.Watch(cctx, key, v3.WithRev(rev))
        for wr = range wch {
            for _, ev := range wr.Events {
                 //监听Delete事件
                if ev.Type == mvccpb.DELETE {
                    return nil
                }
            }
        }
        if err := wr.Err(); err != nil {
            return err
        }
        if err := ctx.Err(); err != nil {
            return err
        }
        return fmt.Errorf("lost watcher waiting for delete")
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
  • waitDeletes正常返回后该进程会获得锁,进入操作共享资源。

# 4. UnLock

  • 解锁操作会直接删除对应的kv,这会触发下一个锁的获取。

    func (m *Mutex) Unlock(ctx context.Context) error {
        client := m.s.Client()
        if _, err := client.Delete(ctx, m.myKey); err != nil {
            return err
        }
        m.myKey = "\x00"
        m.myRev = -1
        return nil
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9

    图片

上次更新: 2023/04/16, 18:35:33
Go操作etcd

← Go操作etcd

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式