刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • go语言基础

  • go语言进阶

  • go语言实现原理

    • channel 实现原理
    • string 实现原理
    • iota
    • slice 实现原理
    • map 实现原理
    • sync.Map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁 CAS 实现原理
    • defer实现原理
    • singleflight实现原理
      • 应用场景
      • 使用方法
      • 实现原理
        • 1. 核心数据结构
        • 2. 核心方法
      • 总结
    • timerate令牌桶限流 实现原理
  • gin框架

  • gorm

  • go测试

  • Go语言
  • go语言实现原理
bigox
2022-07-08
目录

singleflight实现原理

SingleFlight的作用是在处理多个goroutine同时调用同一个函数的时候,只让一个goroutine去实际调用这个函数,等到这个goroutine返回结果的时候,再把结果返回给其他几个同时调用了相同函数的goroutine,这样可以减少并发调用的数量。

# 应用场景

  1. singleflight 包主要是用来做并发控制

  2. 防止 缓存击穿: 缓存在某个时间点过期的时候,恰好在这个时间点对这个Key有大量的并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

# 使用方法

  • singleflight 接口

    //Do方法,传入key,以及回调函数,如果key相同,fn方法只会执行一次,同步等待
    //返回值v:表示fn执行结果
    //返回值err:表示fn的返回的err
    //返回值shared:表示是否是真实fn返回的还是从保存的map[key]返回的,也就是共享的
    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    //DoChan方法类似Do方法,只是返回的是一个chan
    func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    //设计Forget 控制key关联的值是否失效,默认以上两个方法只要fn方法执行完成后,内部维护的fn的值也删除(即并发结束后就失效了)
    func (g *Group) Forget(key string) 
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
  • demo

    package main
    
    import (
    	"golang.org/x/sync/singleflight"
    	"log"
    	"time"
    )
    
    func main() {
    	var singleSetCache singleflight.Group
    
    	getAndSetCache := func(requestID int, cacheKey string) (string, error) {
    		log.Printf("request %v start to get and set cache...", requestID)
    		value, _, _ := singleSetCache.Do(cacheKey, func() (ret interface{}, err error) { //do 的入参key,可以直接使用缓存的key,这样同一个缓存,只有一个协程会去读DB
    			log.Printf("request %v is setting cache...", requestID)
    			time.Sleep(3 * time.Second)
    			log.Printf("request %v set cache success!", requestID)
    			return "VALUE", nil
    		})
    		return value.(string), nil
    	}
    
    	cacheKey := "cacheKey"
    	for i := 1; i < 10; i++ { 
    		go func(requestID int) {
    			value, _ := getAndSetCache(requestID, cacheKey)
    			log.Printf("request %v get value: %v", requestID, value)
    		}(i)
    	}
    	time.Sleep(20 * time.Second)
    }
    
    /* 输出
    2022/07/08 17:43:45 request 4 start to get and set cache...
    2022/07/08 17:43:45 request 4 is setting cache...
    2022/07/08 17:43:45 request 1 start to get and set cache...
    2022/07/08 17:43:45 request 3 start to get and set cache...
    2022/07/08 17:43:45 request 9 start to get and set cache...
    2022/07/08 17:43:45 request 6 start to get and set cache...
    2022/07/08 17:43:45 request 5 start to get and set cache...
    2022/07/08 17:43:45 request 7 start to get and set cache...
    2022/07/08 17:43:45 request 8 start to get and set cache...
    2022/07/08 17:43:45 request 2 start to get and set cache...
    2022/07/08 17:43:48 request 4 set cache success!
    2022/07/08 17:43:48 request 4 get value: VALUE
    2022/07/08 17:43:48 request 2 get value: VALUE
    2022/07/08 17:43:48 request 5 get value: VALUE
    2022/07/08 17:43:48 request 7 get value: VALUE
    2022/07/08 17:43:48 request 8 get value: VALUE
    2022/07/08 17:43:48 request 6 get value: VALUE
    2022/07/08 17:43:48 request 9 get value: VALUE
    2022/07/08 17:43:48 request 1 get value: VALUE
    2022/07/08 17:43:48 request 3 get value: VALUE
    */
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55

# 实现原理

# 1. 核心数据结构

// group
type Group struct {
    mu sync.Mutex
    m  map[string]*call
}
// call
type call struct {
    wg sync.WaitGroup

    val interface{}
    err error

    dups  int
    chans []chan<- Result
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 2. 核心方法

  • SingleFlight 定义一个call结构体,每个结构体都保存了fn调用对应的信息。

  • Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的fn函数调用信息的call结构体。

    • 当不存在时,证明是这个Key的第一次请求,那么会初始化一个call结构体指针,增加SingleFlight内部持有的sync.WaitGroup计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn函数的返回结果
    • 当存在时,增加call结构体内代表fn重复调用次数的计数器dups,释放互斥锁,然后使用WaitGroup等待fn函数执行完成。
    func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    	g.mu.Lock()
    	if g.m == nil {
    		g.m = make(map[string]*call)
    	}
    	if c, ok := g.m[key]; ok {  // 存在取值返回
    		c.dups++
    		...
    		return c.val, c.err, true
    	}
    	c := new(call)
    	c.wg.Add(1)
    	g.m[key] = c
    	g.mu.Unlock()
    
    	g.doCall(c, key, fn)   // 不存在调用 docall
    	return c.val, c.err, c.dups > 0
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
  • call结构体的val 和 err 两个字段只会在 doCall方法中执行fn有返回结果后才赋值,所以当 doCall方法 和 WaitGroup.Wait返回时,函数调用的结果和错误会返回给Do方法的所有调用者。

    
    func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    	normalReturn := false
    	recovered := false
    
    	defer func() {
    		// the given function invoked runtime.Goexit
    		if !normalReturn && !recovered {
    			c.err = errGoexit
    		}
    
    		c.wg.Done()
    		g.mu.Lock()
    		defer g.mu.Unlock()
    		if !c.forgotten {
    			delete(g.m, key)
    		}
    
    		if e, ok := c.err.(*panicError); ok {
    			...
    		} else {
    			// Normal return
    			for _, ch := range c.chans {
    				ch <- Result{c.val, c.err, c.dups > 0}  
    			}
    		}
    	}()
    
    	func() {
    		defer func() {
    			if !normalReturn {
    				if r := recover(); r != nil {
    					c.err = newPanicError(r)
    				}
    			}
    		}()
    
    		c.val, c.err = fn()  // 真正的执行调用函数
    		normalReturn = true
    	}()
    
    	if !normalReturn {
    		recovered = true
    	}
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46

# 总结

  1. Group中存入map, 新到的请求按照key存入map, mutex保证并发安全
  2. 如果map中已经存在相同的key, 会wg.wait()等待doCall执行完毕
  3. doCall真正的调用请求方法,只会有一个请求真正执行,执行成功后wg.Done唤醒wg.wait()中的请求
  4. 全部返回最终的value.
上次更新: 2023/04/16, 18:35:33
defer实现原理
timerate令牌桶限流 实现原理

← defer实现原理 timerate令牌桶限流 实现原理→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式