优先级队列
优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。
可以通过设置队列的 x-max-priority 参数来实现。示例代码如代码清单 4-9 所示。
代码清单 4-9
Map<String , Object> args = new HashMap<String , Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("queue.priority", true, false, false, args) ;
1
2
3
2
3
通过 Web 管理页面可以看到 "Pri" 的标识 ,如图 4-6 所示。
上面的代码演示的是如何配置一个队列的最大优先级。在此之后,需要在发送时在消息中设置消息当前的优先级。示例代码如代码清单 4-10 所示。
代码清单 4-10
AMQP.BasicProperties.Bui1der builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
AMQP.BasicProperties properties = builder.build();
channel.basicPub1ish("exchange_priority", "rk_priority", properties, ("messages").getBytes());
1
2
3
4
2
3
4
上面的代码中设置消息的优先级为 5 。默认最低为 0 ,最高为队列设置的最大优先级。优先级高的消息可以被优先消费,这个也是有前提的:如果在消费者的消费速度大于生产者的速度且 Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于 Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
go 代码实现:
生产者端:
package main
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
PrioritySend()
}
func PrioritySend() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
ch.QueueDeclare("test_priority", true, false, false, false, amqp.Table{
"x-max-priority": 5,
})
ticker := time.NewTicker(time.Millisecond * 1000)
num := 0
for range ticker.C {
msg := "delay_msg: " + time.Now().Format(time.RFC3339Nano)
num++
var p uint8 = 1
if num%2 == 0 {
p = 5
}
ch.PublishWithContext(context.Background(), "", "test_priority", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
Priority: p,
})
fmt.Printf("[priority_send]: %s, priority: %d\n", msg, p)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
消费者端:
package main
import (
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
PriorityConsumer()
}
func PriorityConsumer() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
msgs, _ := ch.Consume("test_priority", "", true, false, false, false, nil)
for msg := range msgs {
fmt.Printf("[consumer_priority]: %s, priority: %d\n", string(msg.Body), msg.Priority)
time.Sleep(time.Second * 2)
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
实际运行代码可以看出开始只会消费 priority 为 5 的消息。
上次更新: 5/9/2023, 10:58:32 AM