Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
    • 过期时间 (TTL)
    • 死信队列
    • 延迟队列
    • 优先级队列
    • RPC 实现
    • 持久化
    • 生产者确认
    • 消费端要点介绍
    • 消息传输保障
    • 小结
  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 进阶
ezreal_rao
2023-04-06

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓 "延迟消息" 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

延迟队列的使用场景有很多,比如:

  • 在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延迟队列来处理这些订单了 。
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备。

在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。

在图 4-4 中,不仅展示的是死信队列的用法,也是延迟队列的用法,对于 queue.dlx 这个死信队列来说,同样可以看作延迟队列。假设一个应用中需要将每条消息都设置为 10 秒的延迟, 生产者通过 exchange.normal 这个交换器将发送的消息存储在 queue.normal 这个队列中。消费者订阅的并非是 queue.normal 这个队列,而是 queue.dlx 这个队列 。当消息从 queue.normal 这个队列中过期之后被存入 queue.dlx 这个队列中,消费者就恰巧消费到了延迟 10 秒的这条消息 。

在真实应用中,对于延迟队列可以根据延迟时间的长短分为多个等级,一般分为 5 秒、 10 秒、 30 秒、 1 分钟、 5 分钟、 10 分钟、 30 分钟、 1 小时这几个维度,当然也可以再细化一下。

参考图 4-5 ,为了简化说明,这里只设置了 5 秒、 10 秒、 30 秒、 l 分钟这四个等级。根据应用需求的不同,生产者在发送消息的时候通过设置不同的路由键,以此将消息发送到与交换器绑定的不同的队列中。这里队列分别设置了过期时间为 5 秒、 10 秒、 30 秒、 1 分钟,同时也分别配置了 DLX 和相应的死信队列。当相应的消息过期时,就会转存到相应的死信队列 (即延迟队列) 中,这样消费者根据业务自身的情况,分别选择不同延迟等级的延迟队列进行消费 。

延迟队列

另外,也可以使用 rabbitmq rabbitmq_delayed_message_exchange (opens new window) 插件来实现延时队列。

生产者

package main

import (
	"context"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	DelaySend()
}

// DelaySend 延时队列发送
func DelaySend() {
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()
	exchangeName := "test_delay"
	routingKey := "test.delay"
	queueName := "test_delay"

	// 声明 队列
	ch.QueueDeclare(queueName, true, false, false, false, nil)

	// 声明交换机
	ch.ExchangeDeclare(exchangeName, "x-delayed-message", false, false, false, false, amqp.Table{
		"x-delayed-type": "direct",
	})

	// 绑定队列到交换机
	ch.QueueBind(queueName, routingKey, exchangeName, false, nil)
	ticker := time.NewTicker(time.Millisecond * 1000)
	for range ticker.C {
		msg := "delay_msg: " + time.Now().Format(time.RFC3339Nano)
		_ = ch.PublishWithContext(context.Background(), exchangeName, routingKey, false, false, amqp.Publishing{
			Headers: amqp.Table{
				"x-delay": "10000",
			},
			ContentType: "text/plain",
			Body:        []byte(msg),
		})
		fmt.Println("[delay_send]: ", msg)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

消费者

package main

import (
	"fmt"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	DelayConsumer()
}

func DelayConsumer() {
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()
	queueName := "test_delay"
	msgs, _ := ch.Consume(queueName, "", true, false, false, false, nil)
	for msg := range msgs {
		fmt.Println("[delay_consumer]: ", string(msg.Body))
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
死信队列
优先级队列

← 死信队列 优先级队列→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式