刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • 设计模式

  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
      • 概念
      • 应用场景
      • 实现方式
        • 1. 基于 数据结构实现
        • 1.1 优先队列
        • 1.2 排序链表
        • 2. 基于Redis
        • 2.1 key expiration
        • 2.2 zset
        • 3. 基于 消息队列
        • 4. 基于 时间轮算法
  • 算法

  • 数据结构与算法
  • 数据结构
bigox
2022-06-10
目录

队列:延迟队列

# 概念

  • **延迟队列(Delay Queue)**是一种特殊类型的消息队列,它允许将消息推送到队列中,并指定一个延迟时间,在经过一定时间后才会将消息转发给消费者进行处理。延迟队列在实际应用中具有广泛的用途,例如实现消息重试、定时任务调度、延迟处理等场景。

# 应用场景

  • 消息重试:当某条消息处理失败时,可以将消息放入延迟队列,并设置一定的延迟时间,以便稍后再次尝试处理。
  • 定时任务调度:延迟队列常用于定时任务调度,可以将需要执行的任务作为消息放入延迟队列,并设置延迟时间,队列会在预定的时间之后将消息推送给消费者执行任务。
  • 延迟处理:在一些场景下,需要对一些操作进行延迟处理,例如订单确认、退款等。

# 实现方式

# 1. 基于 数据结构实现

# 1.1 优先队列

  • 优点:

    • 快速查找:通过堆的性质,可以在 O(1) 时间复杂度内获取下一个要处理的消息。
    • 高效插入:插入新消息的时间复杂度通常为 O(log n),由于堆的结构调整。
    • 支持批量处理:可以一次性处理多个过期消息,提高效率。
  • 缺点:

    • 队列长度动态变化:由于延迟队列的长度随时间动态变化,可能需要进行动态扩容,带来额外的时间和空间消耗。
    • 需要定期检查:为了及时处理过期消息,需要定期检查队列中的消息并进行处理,增加了一定的系统开销。
  • code

    package delayqueue
    
    import (
    	"container/heap"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    // The start of PriorityQueue implementation.
    // Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go
    
    type item struct {
    	Value    interface{}
    	Priority int64
    	Index    int
    }
    
    // this is a priority queue as implemented by a min heap
    // ie. the 0th element is the *lowest* value
    type priorityQueue []*item
    
    func newPriorityQueue(capacity int) priorityQueue {
    	return make(priorityQueue, 0, capacity)
    }
    
    func (pq priorityQueue) Len() int {
    	return len(pq)
    }
    
    func (pq priorityQueue) Less(i, j int) bool {
    	return pq[i].Priority < pq[j].Priority
    }
    
    func (pq priorityQueue) Swap(i, j int) {
    	pq[i], pq[j] = pq[j], pq[i]
    	pq[i].Index = i
    	pq[j].Index = j
    }
    
    func (pq *priorityQueue) Push(x interface{}) {
    	n := len(*pq)
    	c := cap(*pq)
    	if n+1 > c {
    		npq := make(priorityQueue, n, c*2)
    		copy(npq, *pq)
    		*pq = npq
    	}
    	*pq = (*pq)[0 : n+1]
    	item := x.(*item)
    	item.Index = n
    	(*pq)[n] = item
    }
    
    func (pq *priorityQueue) Pop() interface{} {
    	n := len(*pq)
    	c := cap(*pq)
    	if n < (c/2) && c > 25 {
    		npq := make(priorityQueue, n, c/2)
    		copy(npq, *pq)
    		*pq = npq
    	}
    	item := (*pq)[n-1]
    	item.Index = -1
    	*pq = (*pq)[0 : n-1]
    	return item
    }
    
    func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) {
    	if pq.Len() == 0 {
    		return nil, 0
    	}
    
    	item := (*pq)[0]
    	if item.Priority > max {
    		return nil, item.Priority - max
    	}
    	heap.Remove(pq, 0)
    
    	return item, 0
    }
    
    // The end of PriorityQueue implementation.
    
    // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which
    // an element can only be taken when its delay has expired. The head of the
    // queue is the *Delayed* element whose delay expired furthest in the past.
    type DelayQueue struct {
    	C chan interface{}
    
    	mu sync.Mutex
    	pq priorityQueue
    
    	// Similar to the sleeping state of runtime.timers.
    	sleeping int32
    	wakeupC  chan struct{}
    }
    
    // New creates an instance of delayQueue with the specified size.
    func New(size int) *DelayQueue {
    	return &DelayQueue{
    		C:       make(chan interface{}),
    		pq:      newPriorityQueue(size),
    		wakeupC: make(chan struct{}),
    	}
    }
    
    // Offer inserts the element into the current queue.
    func (dq *DelayQueue) Offer(elem interface{}, expiration int64) {
    	item := &item{Value: elem, Priority: expiration}
    
    	dq.mu.Lock()
    	heap.Push(&dq.pq, item)
    	index := item.Index
    	dq.mu.Unlock()
    
    	if index == 0 {
    		// A new item with the earliest expiration is added.
    		if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) {
    			dq.wakeupC <- struct{}{}
    		}
    	}
    }
    
    // Poll starts an infinite loop, in which it continually waits for an element
    // to expire and then send the expired element to the channel C.
    func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) {
    	for {
    		now := nowF()
    
    		dq.mu.Lock()
    		item, delta := dq.pq.PeekAndShift(now)
    		if item == nil {
    			// No items left or at least one item is pending.
    
    			// We must ensure the atomicity of the whole operation, which is
    			// composed of the above PeekAndShift and the following StoreInt32,
    			// to avoid possible race conditions between Offer and Poll.
    			atomic.StoreInt32(&dq.sleeping, 1)
    		}
    		dq.mu.Unlock()
    
    		if item == nil {
    			if delta == 0 {
    				// No items left.
    				select {
    				case <-dq.wakeupC:
    					// Wait until a new item is added.
    					continue
    				case <-exitC:
    					goto exit
    				}
    			} else if delta > 0 {
    				// At least one item is pending.
    				select {
    				case <-dq.wakeupC:
    					// A new item with an "earlier" expiration than the current "earliest" one is added.
    					continue
    				case <-time.After(time.Duration(delta) * time.Millisecond):
    					// The current "earliest" item expires.
    
    					// Reset the sleeping state since there's no need to receive from wakeupC.
    					if atomic.SwapInt32(&dq.sleeping, 0) == 0 {
    						// A caller of Offer() is being blocked on sending to wakeupC,
    						// drain wakeupC to unblock the caller.
    						<-dq.wakeupC
    					}
    					continue
    				case <-exitC:
    					goto exit
    				}
    			}
    		}
    
    		select {
    		case dq.C <- item.Value:
    			// The expired element has been sent out successfully.
    		case <-exitC:
    			goto exit
    		}
    	}
    
    exit:
    	// Reset the states
    	atomic.StoreInt32(&dq.sleeping, 0)
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187

# 1.2 排序链表

  • 优点:
    • 高效插入:插入新消息的时间复杂度通常为 O(1),因为只需找到插入位置并进行链表节点的插入。
    • 不需要定期检查:由于链表是有序的,只需检查链表头即可得知下一个要处理的消息。
  • 缺点:
    • 查找效率较低:为了找到下一个要处理的消息,可能需要遍历链表,导致查找时间复杂度为 O(n)。

# 2. 基于Redis

# 2.1 key expiration

  • 要是利用 Redis 中的 Key 过期机制,简单来讲,就是利用 Redis 中的发布/订阅功能,如果我们开启了 Redis 的 Key 过期事件监听,那么,当某个 Key 过期的时候,Redis 就会把这条消息发布出来,通过订阅这个事件,从而达到延迟队列的效果。首先,确保 Redis 开启了 Key 过期事件监听,修改 Redis 的配置文件 redis.conf 如notify-keyspace-events Ex, 在这种情况下,如果我们为某一个 Key 指定了过期时间,那么,当到达这个过期时间以后,Redis 会向名为 __keyevent@0__:expired 的频道中推送一条消息,消息的内容为过期的这个 Key

# 2.2 zset

  • 利用Redis的Zset可以实现延迟队列的效果,通过设置Score属性为集合中的成员进行从小到大的排序。

# 3. 基于 消息队列

  • rabbit
    • RabbitMQ 存在两个属性: *TTL(Time To Live)*和 DLX(Dead Letter Exchange) 。
    • TTL指消息存活的时间,RabbitMQ通过x-message-tt参数来设置指定队列(队列中所有消息都具有相同的过期时间)或消息(某一条消息设置过期时间)上消息的存活时间,它的值是一个非负整数,单位为微秒。如果同时设置队列和队列中消息的TTL,则以较小的值为准。超过TTL的消息则成为Dead Letter(死信)。
    • 死信可以重新路由到另一个Exchange(交换机),让消息重新被消费。通过设置x-dead-letter-exchange(Dead Letter重新路的交换机)和x-dead-letter-routing-key(转发的队列)来实现。
  • kafka

# 4. 基于 时间轮算法

  • 时间轮能够高效地利用线程资源来进行批量化调度,将任务绑定到同一个的调度器上面,使用这个调度器来进行任务的管理、触发以及运行
#数据结构
上次更新: 2023/08/27, 21:33:49
队列:优先队列
递归算法

← 队列:优先队列 递归算法→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>
Theme by Vdoing | Copyright © 2020-2024 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式