队列:延迟队列
# 概念
- **延迟队列(Delay Queue)**是一种特殊类型的消息队列,它允许将消息推送到队列中,并指定一个延迟时间,在经过一定时间后才会将消息转发给消费者进行处理。延迟队列在实际应用中具有广泛的用途,例如实现消息重试、定时任务调度、延迟处理等场景。
# 应用场景
- 消息重试:当某条消息处理失败时,可以将消息放入延迟队列,并设置一定的延迟时间,以便稍后再次尝试处理。
- 定时任务调度:延迟队列常用于定时任务调度,可以将需要执行的任务作为消息放入延迟队列,并设置延迟时间,队列会在预定的时间之后将消息推送给消费者执行任务。
- 延迟处理:在一些场景下,需要对一些操作进行延迟处理,例如订单确认、退款等。
# 实现方式
# 1. 基于 数据结构实现
# 1.1 优先队列
优点:
- 快速查找:通过堆的性质,可以在 O(1) 时间复杂度内获取下一个要处理的消息。
- 高效插入:插入新消息的时间复杂度通常为 O(log n),由于堆的结构调整。
- 支持批量处理:可以一次性处理多个过期消息,提高效率。
缺点:
- 队列长度动态变化:由于延迟队列的长度随时间动态变化,可能需要进行动态扩容,带来额外的时间和空间消耗。
- 需要定期检查:为了及时处理过期消息,需要定期检查队列中的消息并进行处理,增加了一定的系统开销。
code
package delayqueue import ( "container/heap" "sync" "sync/atomic" "time" ) // The start of PriorityQueue implementation. // Borrowed from https://github.com/nsqio/nsq/blob/master/internal/pqueue/pqueue.go type item struct { Value interface{} Priority int64 Index int } // this is a priority queue as implemented by a min heap // ie. the 0th element is the *lowest* value type priorityQueue []*item func newPriorityQueue(capacity int) priorityQueue { return make(priorityQueue, 0, capacity) } func (pq priorityQueue) Len() int { return len(pq) } func (pq priorityQueue) Less(i, j int) bool { return pq[i].Priority < pq[j].Priority } func (pq priorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] pq[i].Index = i pq[j].Index = j } func (pq *priorityQueue) Push(x interface{}) { n := len(*pq) c := cap(*pq) if n+1 > c { npq := make(priorityQueue, n, c*2) copy(npq, *pq) *pq = npq } *pq = (*pq)[0 : n+1] item := x.(*item) item.Index = n (*pq)[n] = item } func (pq *priorityQueue) Pop() interface{} { n := len(*pq) c := cap(*pq) if n < (c/2) && c > 25 { npq := make(priorityQueue, n, c/2) copy(npq, *pq) *pq = npq } item := (*pq)[n-1] item.Index = -1 *pq = (*pq)[0 : n-1] return item } func (pq *priorityQueue) PeekAndShift(max int64) (*item, int64) { if pq.Len() == 0 { return nil, 0 } item := (*pq)[0] if item.Priority > max { return nil, item.Priority - max } heap.Remove(pq, 0) return item, 0 } // The end of PriorityQueue implementation. // DelayQueue is an unbounded blocking queue of *Delayed* elements, in which // an element can only be taken when its delay has expired. The head of the // queue is the *Delayed* element whose delay expired furthest in the past. type DelayQueue struct { C chan interface{} mu sync.Mutex pq priorityQueue // Similar to the sleeping state of runtime.timers. sleeping int32 wakeupC chan struct{} } // New creates an instance of delayQueue with the specified size. func New(size int) *DelayQueue { return &DelayQueue{ C: make(chan interface{}), pq: newPriorityQueue(size), wakeupC: make(chan struct{}), } } // Offer inserts the element into the current queue. func (dq *DelayQueue) Offer(elem interface{}, expiration int64) { item := &item{Value: elem, Priority: expiration} dq.mu.Lock() heap.Push(&dq.pq, item) index := item.Index dq.mu.Unlock() if index == 0 { // A new item with the earliest expiration is added. if atomic.CompareAndSwapInt32(&dq.sleeping, 1, 0) { dq.wakeupC <- struct{}{} } } } // Poll starts an infinite loop, in which it continually waits for an element // to expire and then send the expired element to the channel C. func (dq *DelayQueue) Poll(exitC chan struct{}, nowF func() int64) { for { now := nowF() dq.mu.Lock() item, delta := dq.pq.PeekAndShift(now) if item == nil { // No items left or at least one item is pending. // We must ensure the atomicity of the whole operation, which is // composed of the above PeekAndShift and the following StoreInt32, // to avoid possible race conditions between Offer and Poll. atomic.StoreInt32(&dq.sleeping, 1) } dq.mu.Unlock() if item == nil { if delta == 0 { // No items left. select { case <-dq.wakeupC: // Wait until a new item is added. continue case <-exitC: goto exit } } else if delta > 0 { // At least one item is pending. select { case <-dq.wakeupC: // A new item with an "earlier" expiration than the current "earliest" one is added. continue case <-time.After(time.Duration(delta) * time.Millisecond): // The current "earliest" item expires. // Reset the sleeping state since there's no need to receive from wakeupC. if atomic.SwapInt32(&dq.sleeping, 0) == 0 { // A caller of Offer() is being blocked on sending to wakeupC, // drain wakeupC to unblock the caller. <-dq.wakeupC } continue case <-exitC: goto exit } } } select { case dq.C <- item.Value: // The expired element has been sent out successfully. case <-exitC: goto exit } } exit: // Reset the states atomic.StoreInt32(&dq.sleeping, 0) }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# 1.2 排序链表
- 优点:
- 高效插入:插入新消息的时间复杂度通常为 O(1),因为只需找到插入位置并进行链表节点的插入。
- 不需要定期检查:由于链表是有序的,只需检查链表头即可得知下一个要处理的消息。
- 缺点:
- 查找效率较低:为了找到下一个要处理的消息,可能需要遍历链表,导致查找时间复杂度为 O(n)。
# 2. 基于Redis
# 2.1 key expiration
- 要是利用
Redis
中的 Key 过期机制,简单来讲,就是利用Redis
中的发布/订阅功能,如果我们开启了Redis
的 Key 过期事件监听,那么,当某个 Key 过期的时候,Redis
就会把这条消息发布出来,通过订阅这个事件,从而达到延迟队列的效果。首先,确保Redis
开启了 Key 过期事件监听,修改 Redis 的配置文件redis.conf
如notify-keyspace-events Ex
, 在这种情况下,如果我们为某一个 Key 指定了过期时间,那么,当到达这个过期时间以后,Redis
会向名为__keyevent@0__:expired
的频道中推送一条消息,消息的内容为过期的这个 Key
# 2.2 zset
- 利用Redis的Zset可以实现延迟队列的效果,通过设置Score属性为集合中的成员进行从小到大的排序。
# 3. 基于 消息队列
- rabbit
- RabbitMQ 存在两个属性: *TTL(Time To Live)*和 DLX(Dead Letter Exchange) 。
- TTL指消息存活的时间,RabbitMQ通过x-message-tt参数来设置指定队列(队列中所有消息都具有相同的过期时间)或消息(某一条消息设置过期时间)上消息的存活时间,它的值是一个非负整数,单位为微秒。如果同时设置队列和队列中消息的TTL,则以较小的值为准。超过TTL的消息则成为Dead Letter(死信)。
- 死信可以重新路由到另一个Exchange(交换机),让消息重新被消费。通过设置x-dead-letter-exchange(Dead Letter重新路的交换机)和x-dead-letter-routing-key(转发的队列)来实现。
- kafka
# 4. 基于 时间轮算法
- 时间轮能够高效地利用线程资源来进行批量化调度,将任务绑定到同一个的调度器上面,使用这个调度器来进行任务的管理、触发以及运行
上次更新: 2023/08/27, 21:33:49