延迟队列
延迟队列存储的对象是对应的延迟消息,所谓 "延迟消息" 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
延迟队列的使用场景有很多,比如:
- 在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了 。
- 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。
在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。
在图 4-4 中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于 queue.dlx 这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为 10 秒的延迟, 生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息 。
在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为 5 秒、 10 秒、 30 秒、 1 分钟、 5 分钟、 10 分钟、 30 分钟、 1 小时这几个维度,当然也可以再细化一下。
参考图 4-5 ,为了简化说明,这里只设置了 5 秒、 10 秒、 30 秒、 l 分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。这里队列分别设置了过期时间为 5 秒、 10 秒、 30 秒、 1 分钟,同时也分别配置了 DLX 和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列 (即延迟队列) 中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费 。
另外,也可以使用 rabbitmq rabbitmq_delayed_message_exchange
(opens new window) 插件来实现延时队列。
生产者
package main
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
DelaySend()
}
// DelaySend 延时队列发送
func DelaySend() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
exchangeName := "test_delay"
routingKey := "test.delay"
queueName := "test_delay"
// 声明 队列
ch.QueueDeclare(queueName, true, false, false, false, nil)
// 声明交换机
ch.ExchangeDeclare(exchangeName, "x-delayed-message", false, false, false, false, amqp.Table{
"x-delayed-type": "direct",
})
// 绑定队列到交换机
ch.QueueBind(queueName, routingKey, exchangeName, false, nil)
ticker := time.NewTicker(time.Millisecond * 1000)
for range ticker.C {
msg := "delay_msg: " + time.Now().Format(time.RFC3339Nano)
_ = ch.PublishWithContext(context.Background(), exchangeName, routingKey, false, false, amqp.Publishing{
Headers: amqp.Table{
"x-delay": "10000",
},
ContentType: "text/plain",
Body: []byte(msg),
})
fmt.Println("[delay_send]: ", msg)
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
消费者
package main
import (
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
DelayConsumer()
}
func DelayConsumer() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
queueName := "test_delay"
msgs, _ := ch.Consume(queueName, "", true, false, false, false, nil)
for msg := range msgs {
fmt.Println("[delay_consumer]: ", string(msg.Body))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23