Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
    • 过期时间 (TTL)
    • 死信队列
    • 延迟队列
    • 优先级队列
    • RPC 实现
    • 持久化
    • 生产者确认
    • 消费端要点介绍
    • 消息传输保障
    • 小结
  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 进阶
ezreal_rao
2023-04-06

优先级队列

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。

可以通过设置队列的 x-max-priority 参数来实现。示例代码如代码清单 4-9 所示。

代码清单 4-9

Map<String , Object> args = new HashMap<String , Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, args) ;
1
2
3

通过 Web 管理页面可以看到 "Pri" 的标识 ,如图 4-6 所示。

优先级队列的属性展示

上面的代码演示的是如何配置一个队列的最大优先级。在此之后,需要在发送时在消息中设置消息当前的优先级。示例代码如代码清单 4-10 所示。

代码清单 4-10

AMQP.BasicProperties.Bui1der builder = new AMQP.BasicProperties.Builder(); 
builder.priority(5); 
AMQP.BasicProperties properties = builder.build(); 
channel.basicPub1ish("exchange_priority", "rk_priority", properties, ("messages").getBytes());
1
2
3
4

上面的代码中设置消息的优先级为 5 。默认最低为 0 ,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。

go 代码实现:

生产者端:

package main

import (
	"context"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	PrioritySend()
}

func PrioritySend() {
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()
	ch.QueueDeclare("test_priority", true, false, false, false, amqp.Table{
		"x-max-priority": 5,
	})
	ticker := time.NewTicker(time.Millisecond * 1000)
	num := 0
	for range ticker.C {
		msg := "delay_msg: " + time.Now().Format(time.RFC3339Nano)
		num++
		var p uint8 = 1
		if num%2 == 0 {
			p = 5
		}
		ch.PublishWithContext(context.Background(), "", "test_priority", false, false, amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(msg),
			Priority:    p,
		})
		fmt.Printf("[priority_send]: %s, priority: %d\n", msg, p)
	}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

消费者端:

package main

import (
	"fmt"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	PriorityConsumer()
}

func PriorityConsumer() {
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
	ch, _ := conn.Channel()

	msgs, _ := ch.Consume("test_priority", "", true, false, false, false, nil)
	for msg := range msgs {
		fmt.Printf("[consumer_priority]: %s, priority: %d\n", string(msg.Body), msg.Priority)
		time.Sleep(time.Second * 2)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

实际运行代码可以看出开始只会消费 priority 为 5 的消息。

#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
延迟队列
RPC 实现

← 延迟队列 RPC 实现→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式