死信队列
DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况:
- 消息被拒绝 (Basic.Reject/Basic.Nack),井且设置 requeue 参数为 false;
- 消息过期;
- 队列达到最大长度。
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 , 实际上就是设置某个队列的属性。当这个队列中存在死信时, RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 imrnediate 参数的功能。
通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX (代码清单 4-7 中的 dlx exchange):
代码清单 4-7
channel.exchangeDeclare("dlx_exchange" , "direct"); //创建 DLX: dlx_exchange
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", " dlx_exchange" ); //为队列 myqueue 添加 DLX
channel.queueDeclare("myqueue" , false , false , false , args);
2
3
4
5
6
也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键:
args.put("x-dead-letter-routing-key", "dlx-routing-key");
当然这里也可以通过 Policy 的方式设置:
rabbitmqctl set_policy DLX --apply-to queues
下面创建一个队列,为其设置 TTL 和 DLX 等,如代码清单 4-8 所示。
代码清单 4-8
channel.exchangeDeclare("exchange.dlx", "direct", true);
channel.exchangeDeclare("exchange.normal" , "fanout", true);
Map<String, Object> args = new HashMap<String , Object>();
args.put("x-message-ttl", 10000);
args.put("x-dead-letter-exchange" , "exchange.dlx");
args.put("x-dead-letter-routing-key", "routingkey" );
channe1.queueDec1are("queue.norma1", true, false, false, args);
channe1.queueBind("queue.normal", "exchange.normal", "");
channe1.queueDec1are("queue.d1x", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingkey");
channel.basicPublish("exchange.normal" , "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
2
3
4
5
6
7
8
9
10
11
12
13
这里创建了两个交换器 exchange.normal 和 exchange.dlx ,分别绑定两个队列 queue.normal 和 queue.dlx。
由 Web 管理页面 (图 4-3) 可以看出,两个队列都被标记了 "D",这个是 durable 的缩写, 即设置了队列持久化。 queue.normal 这个队列还配置了 TTL、 DLX 和 DLK,其中 DLX 指的是 x-dead-letter-routing-key 这个属性。
参考图 4-4 , 生产者首先发送一条携带路由键为 "rk" 的消息,然后经过交换器 exchange.normal 顺利地存储到队列 queue.normal 中 。由于队列 queue.normal 设置了过期时间为 10s , 在这 10s 内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了 DLX , 过期之时 , 消息被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx, 最 后消息被存储在 queue.dlx 这个死信队列中。
对于 RabbitMQ 来说, DLX 是一个非常有用的特性。 它可以处理异常情况下,消息不能够被消费者正确消费 (消费者调用了 Basic.Nack 或者 Basic.Reject) 而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。 DLX 配合 TTL 使用还可以实现延迟队列的功能,详细请看下一节。
# golang 实现
golang 相关代码如下:
消费者
package main
import (
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
DLXConsumer()
}
// DLXConsumer 死信队列消费者
func DLXConsumer() {
conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
if err != nil {
log.Fatal(err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatal(err)
}
// 死信队列中的 routingkey 如果在 生产端没设置 和 queue name 是一样的
dlxExchange := "dlx_exchage"
queueName := "test_queue_dlx"
routingKey := "test_queue"
_ = ch.ExchangeDeclare(dlxExchange, amqp.ExchangeDirect, false, false, false, false, nil)
q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
ch.QueueBind(q.Name, routingKey, dlxExchange, false, nil)
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)
for v := range msgs {
fmt.Println("[DLX] msg: ", string(v.Body))
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
生产者
package main
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
DLXSend()
}
// DLXSend 死信队列的发送者
func DLXSend() {
// 绑定死信队列
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
dlxExchange := "dlx_exchage"
queueName := "test_queue"
ch.ExchangeDeclare(dlxExchange, amqp.ExchangeDirect, false, false, false, false, nil)
q, _ := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
"x-dead-letter-exchange": dlxExchange,
})
ticker := time.NewTicker(time.Millisecond * 1000)
for range ticker.C {
msg := "hello wolrd" + time.Now().Format(time.RFC3339Nano)
fmt.Println(msg)
_ = ch.PublishWithContext(context.Background(), "", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
Expiration: "5000", // 单位是 毫秒
DeliveryMode: 2,
})
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41