Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
    • 过期时间 (TTL)
    • 死信队列
      • 延迟队列
      • 优先级队列
      • RPC 实现
      • 持久化
      • 生产者确认
      • 消费端要点介绍
      • 消息传输保障
      • 小结
    • RabbitMQ 管理

    • RabbitMQ实战指南
    • RabbitMQ 进阶
    ezreal_rao
    2023-04-06
    目录

    死信队列

    DLX ,全称为 Dead-Letter-Exchange ,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。

    消息变成死信一般是由于以下几种情况:

    • 消息被拒绝 (Basic.Reject/Basic.Nack),井且设置 requeue 参数为 false;
    • 消息过期;
    • 队列达到最大长度。

    DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定 , 实际上就是设置某个队列的属性。当这个队列中存在死信时, RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去 ,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理,这个特性与将消息的 TTL 设置为 0 配合使用可以弥补 imrnediate 参数的功能。

    通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX (代码清单 4-7 中的 dlx exchange):

    代码清单 4-7

    channel.exchangeDeclare("dlx_exchange" , "direct"); //创建 DLX: dlx_exchange
    
    Map<String, Object> args = new HashMap<String, Object>();
    
    args.put("x-dead-letter-exchange", " dlx_exchange" ); //为队列 myqueue 添加 DLX 
    channel.queueDeclare("myqueue" , false , false , false , args);
    
    1
    2
    3
    4
    5
    6

    也可以为这个 DLX 指定路由键,如果没有特殊指定,则使用原队列的路由键:

    args.put("x-dead-letter-routing-key", "dlx-routing-key");
    
    1

    当然这里也可以通过 Policy 的方式设置:

    rabbitmqctl set_policy DLX --apply-to queues
    
    1

    下面创建一个队列,为其设置 TTL 和 DLX 等,如代码清单 4-8 所示。

    代码清单 4-8

    channel.exchangeDeclare("exchange.dlx", "direct", true); 
    channel.exchangeDeclare("exchange.normal" , "fanout", true);
    
    Map<String, Object> args = new HashMap<String , Object>();
    
    args.put("x-message-ttl", 10000); 
    args.put("x-dead-letter-exchange" , "exchange.dlx"); 
    args.put("x-dead-letter-routing-key", "routingkey" ); 
    channe1.queueDec1are("queue.norma1", true, false, false, args); 
    channe1.queueBind("queue.normal", "exchange.normal", ""); 
    channe1.queueDec1are("queue.d1x", true, false, false, null); 
    channel.queueBind("queue.dlx", "exchange.dlx", "routingkey"); 
    channel.basicPublish("exchange.normal" , "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13

    这里创建了两个交换器 exchange.normal 和 exchange.dlx ,分别绑定两个队列 queue.normal 和 queue.dlx。

    由 Web 管理页面 (图 4-3) 可以看出,两个队列都被标记了 "D",这个是 durable 的缩写, 即设置了队列持久化。 queue.normal 这个队列还配置了 TTL、 DLX 和 DLK,其中 DLX 指的是 x-dead-letter-routing-key 这个属性。

    队列的属性展示

    参考图 4-4 , 生产者首先发送一条携带路由键为 "rk" 的消息,然后经过交换器 exchange.normal 顺利地存储到队列 queue.normal 中 。由于队列 queue.normal 设置了过期时间为 10s , 在这 10s 内没有消费者消费这条消息,那么判定这条消息为过期。由于设置了 DLX , 过期之时 , 消息被丢给交换器 exchange.dlx 中,这时找到与 exchange.dlx 匹配的队列 queue.dlx, 最 后消息被存储在 queue.dlx 这个死信队列中。

    死信队列

    对于 RabbitMQ 来说, DLX 是一个非常有用的特性。 它可以处理异常情况下,消息不能够被消费者正确消费 (消费者调用了 Basic.Nack 或者 Basic.Reject) 而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。 DLX 配合 TTL 使用还可以实现延迟队列的功能,详细请看下一节。

    # golang 实现

    golang 相关代码如下:

    消费者

    package main
    
    import (
    	"fmt"
    	"log"
    	"time"
    
    	amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
    	DLXConsumer()
    }
    
    // DLXConsumer 死信队列消费者
    func DLXConsumer() {
    	conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
    	if err != nil {
    		log.Fatal(err)
    	}
    	ch, err := conn.Channel()
    	if err != nil {
    		log.Fatal(err)
    	}
    	// 死信队列中的 routingkey 如果在 生产端没设置 和 queue name 是一样的
    	dlxExchange := "dlx_exchage"
    	queueName := "test_queue_dlx"
    	routingKey := "test_queue"
    	_ = ch.ExchangeDeclare(dlxExchange, amqp.ExchangeDirect, false, false, false, false, nil)
    	q, _ := ch.QueueDeclare(queueName, true, false, false, false, nil)
    	ch.QueueBind(q.Name, routingKey, dlxExchange, false, nil)
    	msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)
    	for v := range msgs {
    		fmt.Println("[DLX] msg: ", string(v.Body))
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36

    生产者

    package main
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	amqp "github.com/rabbitmq/amqp091-go"
    )
    
    func main() {
    	DLXSend()
    }
    
    // DLXSend  死信队列的发送者
    func DLXSend() {
    	// 绑定死信队列
    	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
    	ch, _ := conn.Channel()
    
    	dlxExchange := "dlx_exchage"
    	queueName := "test_queue"
    
    	ch.ExchangeDeclare(dlxExchange, amqp.ExchangeDirect, false, false, false, false, nil)
    
    	q, _ := ch.QueueDeclare(queueName, true, false, false, false, amqp.Table{
    		"x-dead-letter-exchange": dlxExchange,
    	})
    
    	ticker := time.NewTicker(time.Millisecond * 1000)
    	for range ticker.C {
    		msg := "hello wolrd" + time.Now().Format(time.RFC3339Nano)
    		fmt.Println(msg)
    		_ = ch.PublishWithContext(context.Background(), "", q.Name, false, false, amqp.Publishing{
    			ContentType:  "text/plain",
    			Body:         []byte(msg),
    			Expiration:   "5000", // 单位是 毫秒
    			DeliveryMode: 2,
    		})
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    #mq#rabbitmq#amq
    上次更新: 5/9/2023, 10:58:32 AM
    过期时间 (TTL)
    延迟队列

    ← 过期时间 (TTL) 延迟队列→

    最近更新
    01
    为什么我的MySQL会抖一下
    07-15
    02
    HTTP 性能优化面面观
    07-12
    03
    WebSocket:沙盒里的 TCP
    07-12
    更多文章>
    Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
    豫ICP备2023001810号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式