Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
      • mandatory 参数
      • immediate 参数
      • 备份交换器
    • 过期时间 (TTL)
    • 死信队列
    • 延迟队列
    • 优先级队列
    • RPC 实现
    • 持久化
    • 生产者确认
    • 消费端要点介绍
    • 消息传输保障
    • 小结
  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 进阶
ezreal_rao
2023-04-04
目录

消息何去何从

mandatory 和 immediate 是 channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达目的地时将消息返回给生产者的功能。 RabbitMQ 提供的备份交换器 (Altemate Exchange) 可以将未能被交换器路由的消息 (没有绑定队列或者没有匹配的绑定) 存储起来,而不用返回给客户端。

对于初学者来说,特别容易将 mandatory 和 immediate 这两个参数混淆,而对于备份交换器更是一筹莫展,本章对此一一展开探讨。

# mandatory 参数

当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。

那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用 channel.addReturnListener 来添加 ReturnListener 监昕器实现。

使用 mandatory 参数的关键代码如代码清单 4-1 所示。

代码清单 4-1

channel.basicPublish(EXCHANGE_NAME, "", true,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() (
    public void handleReturn(int replyCode , String replyText,
                            String exchange, String routingKey,
                            AMQP.BasicProperties basicProperties,
                            byte[] body) throws IOException {
                    String message = new String(body);
                    System.out.println( "Basic.Return 返回的结果是: " +message );
                }
});
1
2
3
4
5
6
7
8
9
10
11
12

上面代码中生产者没有成功地将消息路由到队列,此时 RabbitMQ 会通过 Basic.Return 返回 "mandatory test" 这条消息,之后生产者客户端通过 ReturnListener 监昕到了这个事件,上面代码的最后输出应该是 "Basic.Retum 返回的结果是 : mandatory test"。

从 AMQP 协议层面来说,其对应的流转过程如图 4-1 所示。

mandatory 参数

# immediate 参数

当 imrnediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时, 该消息会通过 Basic.Return 返回至生产者。

概括来说 , mandatory 参数告诉服务器至少将该消息路由到一个队列中, 否则将消息返回给生产者。 imrnediate 参数告诉服务器 , 如果该消息关联的队列上有消费者, 则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0 版本开始去掉了对 imrnediate 参数的支持,对此 RabbitMQ 官方解释是 : imrnediate 参数会影响镜像队列的性能, 增加了代码复杂性 , 建议采用 TTL 和 DLX 的方法替代。(有关 TTL 和 DLX 的介绍请分别参考 4.2 节和 4.3 节。)

发送带 irnmediate 参数 (irnmediate 参数设置为 true) 的 Basic.Publish 客户端会报如下异常:

[WARN] - [An unexpected connection driver error occured (Exception message : Connection reset)] - [com.rabbitmq.client.impl.ForgivingExceptionHandler:120]
1

RabbitMQ 服务端 会报如下异常 (查看 RabbitMQ 的 运行日志,默认日志路径为 HOSTNAME.log):

=ERROR REPORT==== 25-May-2017::15:10:25 ===

Error on AMQP connection <0.25319.2> (192.168.0.2:55254->192.168.0.3:5672, vhost : ' / ', user: 'root ' , state: running) , channel 1:{amqp_error , not_imp1emented , " immediate=true" , 'basic.pub1ish ' }
1
2
3

# 备份交换器

备份交换器,英文名称为 Alternate Exchange,简称 AE,或者更直白地称之为 "备胎交换器"。 生产者在发送消息的时候如果不设置 mandatory 参数, 那么消息在未被路由的情况下将会丢失;如果设置了 mandatory 参数,那么需要添加 ReturnListener 的编程逻辑,生产者的代码将变得复杂。如果既不想复杂化生产者的编程逻辑,又不想消息丢失,那么可以使用备份交换器, 这样可以将未被路由的消息存储在 RabbitMQ 中,再在需要的时候去处理这些消息。

可以通过在声明交换器 (调用 channel.exchangeDeclare 方法) 的时候添加 alternate-exchange 参数来实现,也可以通过策略 (Policy,详细参考 6.3 节) 的方式实现。 如果两者同时使用,则前者的优先级更高,会覆盖掉 Policy 的设置。

使用参数设置的关键代码如代码清单 4-2 所示。

代码清单 4-2

Map<String , Object> 
args = new HashMap<String , Object>();
args.put("alternate-exchange", "myAe"); 
channel.exchangeDeclare("normalExchange", "direct", true, false, args); 
channel.exchangeDeclare("myAe", "fanout", true, false , null); 
channel.queueDeclare("normalQueue", true, false, false, null); 
channel.queueBind("normalQueue", "normalExchange", "normalKey"); 
channel.queueDeclare("unroutedQueue", true, false, false, null);
channel.queueBind("unroutedQueue", "myAe", "");
1
2
3
4
5
6
7
8
9

上面的代码中声明了两个交换器 nonnallixchange 和 myAe ,分别绑定了 nonnalQueue 和 unroutedQueue 这两个队列,同时将 myAe 设置为 normalExchange 的备份交换器。注意 myAe 的交换器类型为 fanout。

参考图 4-2 ,如果此时发送一条消息到 normalExchange 上,当路由键等于 "normalKey" 的 时候,消息能正确路由到 nonnalQueue 这个队列中。如果路由键设为其他值,比如 "errorKey", 即消息不能被正确地路由到与 normallixchange 绑定的任何队列上,此时就会发送给 myAe,进 而发送到 unroutedQueue 这个队列。

备份交换器

同样,如果采用 Policy 的方式来设置备份交换器,可以参考如下:

rabbitmqctl set_policy AE "^normalExchange$" `{"alternate-exchange": "myAE"}'
1

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为 fanout 类型,如若读者想设置为 direct 或者 topic 的类型也没有什么不妥。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。

考虑这样一种情况,如果备份交换器的类型是 direct, 并且有一个与其绑定的队列,假设绑定的路由键是 keyl , 当某条携带路由键为 key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为 keyl,则可以存储到队列中。

对于备份交换器,总结了以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和 mandatory 参数一起使用,那么 mandatory 参数无效。
#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
小结
过期时间 (TTL)

← 小结 过期时间 (TTL)→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式