Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

    • 消息何去何从
    • 过期时间 (TTL)
    • 死信队列
    • 延迟队列
    • 优先级队列
    • RPC 实现
    • 持久化
    • 生产者确认
    • 消费端要点介绍
      • 消息分发
      • 消息顺序性
      • 弃用 QueueingConsumer
    • 消息传输保障
    • 小结
  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 进阶
ezreal_rao
2023-04-18
目录

消费端要点介绍

3.4 节和 3.5 节介绍了如何正确地消费消息。消费者客户端可以通过推模式或者拉模式的方式来获取井消费消息,当消费者处理完业务逻辑需要手动确认消息己被接收,这样 RabbitMQ 才能把当前消息从队列中标记清除。当然如果消费者由于某些原因无法处理当前接收到的消息, 可以通过 channel.basicNack 或者 channel.basicReject 来拒绝掉。

这里对于 RabbitMQ 消费端来说,还有几点需要注意:

  • 消息分发;
  • 消息顺序性;
  • 弃用 QueueingConsumer 。

# 消息分发

当 RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin) 的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。

很多时候轮询的分发机制也不是那么优雅。默认情况下,如果有 n 个消费者,那么 RabbitMQ 会将第 m 条消息分发给第 m% n (取余的方式) 个消费者, RabbitMQ 不管消费者是否消费并己经确认 (Basic.Ack) 了消息。试想一下,如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因 (比如业务逻辑简单、机器性能卓越等) 很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。

那么该如何处理这种情况呢?这里就要用到 channel.basicQos (int prefetchCount) 这个方法,如前面章节所述, channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。

举例说明,在订阅消费队列之前,消费端程序调用了 channel.basicQos (5) ,之后订阅了某个队列进行消费。 RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。 直到消费者确认了某条消息之后, RabbitMQ 将相应的计数减 1,之后消费者可以继续接收消息, 直到再次到达计数上限。这种机制可以类比于 TCP/IP 中的 "滑动窗口" 。

注意要点: Basic.Qos 的使用对于拉模式的消费方式无效.

channel.basicQos 有三种类型的重载方法:

  • void basicQos(int prefetchCount) throws IOException;
  • void basicQos(int prefetchCount, boo1ean globa1) throws IOException;
  • void basicQos(int prefetchSize , int prefetchCount , boo1ean global) throws IOException ;

前面介绍的都只用到了 prefetchCount 这个参数,当 prefetchCount 设置为 0 则表示没有上限。还有 prefetchSize 这个参数表示消费者所能接收未确认消息的总体大小的上限, 单位为 B ,设置为 0 则表示没有上限。

对于一个信道来说,它可以同时消费多个队列,当设置了 prefetchCount 大于 0 时,这个信道需要和各个队列协调以确保发送的消息都没有超过所限定的 prefetchCount 的值,这样会 使 RabbitMQ 的性能降低,尤其是这些队列分散在集群中的多个 Broker 节点之中。RabbitMQ 为了提升相关的性能,在 AMQPO-9-1 协议之上重新定义了 global 这个参数,对比如表 4- 1 所示。

global参数的对比

前面章节中的 channel.basicQos 方法的示例都是针对单个消费者的,而对于同一个信道上的多个消费者而言,如果设置了 prefetchCount 的值,那么都会生效。代码清单 4-20 示例中有两个消费者,各自的能接收到的未确认消息的上限都为 10 。

代码清单 4-20

Channel channel = 
Consumer consumer1 = . ..; 
Consumer consumer2 = ...; 
channel.basicQos(10) ; // Per consumer 1imit 
channel.basicConsume("my-queue1" , false, consumer1); 
channel.basicConsume("my-queue2" , false, consumer2);
1
2
3
4
5
6

如果在订阅消息之前,既设置了 global 为 true 的限制,又设置了 global 为 false 的限制,那么哪个会生效呢?RabbitMQ 会确保两者都会生效。举例说明,当前有两个队列 queue1 和 queue2: queue1 有 10 条消息,分别为 1 到 10; queue2 也有 10 条消息,分别为 11 到 20 。有两个消费者分别消费这两个队列,如代码清单 4-21 所示。

代码清单 4-21

Channel channel = . ..; 
Consumer consumerl = ...; 
Consumer consumer2 = ...; 
channel.basicQos(3 , false); // Per consumer limit 
channel.basicQos(5 , true); // Per channel limit 
channel.basicConsume("queuel", false, consumerl) ; 
channel.basicConsume("queue2", false , consumer2) ;
1
2
3
4
5
6
7

那么这里每个消费者最多只能收到 3 个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为 5 。在未确认消息的情况下,如果 consumerl 接收到了消息 1 、 2 和 3 ,那么 consumer2 至多只能收到 11 和 12。如果像这样同时使用两种 global 的模式,则会增加 RabbitMQ 的负载,因为 RabbitMQ 需要更多的资源来协调完成这些限制。如无特殊需要,最好只使用 global 为 false 的设置,这也是默认的设置。

# 消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为 msgl 、 msg2、 msg3 ,那么消费者必然也是按照 msgl 、 msg2 、 msg3 的顺序进行消费的。

目前很多资料显示 RabbitMQ 的消息能够保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何 RabbitMQ 的高级特性 ,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。

那么哪些情况下 RabbitMQ 的消息顺序性会被打破呢?下面介绍几种常见的情形。

如果生产者使用了事务机制,在发送消息之后遇到异常进行了事务回滚,那么需要重新补偿发送这条消息,如果补偿发送是在另一个线程实现的,那么消息在生产者这个源头就出现了错序。同样,如果启用 publisher confirm 时,在发生超时、中断,又或者是收到 RabbitMQ 的 Basic.Nack 命令时,那么同样需要补偿发送,结果与事务机制一样会错序。或者这种说法有些牵强,我们可以固执地认为消息的顺序性保障是从存入队列之后开始的,而不是在发送的时候开始的。

考虑另一种情形,如果生产者发送的消息设置了不同的超时时间,井且也设置了死信队列, 整体上来说相当于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。

再考虑一种情形,如果消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。

如果一个队列按照前后顺序分有 msg1 , msg2、 msg3 、 msg4 这 4 个消息,同时有 ConsumerA 和 ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中, ConsumerA 中的消息为 msg1 和 msg3 , ConsumerB 中的消息为 msg2、 msg4。 ConsumerA 收到消息 msg1 之后并不想处理而调用了 Basic.Nack/.Reject 将消息拒绝,与此同时将 requeue 设置为 true ,这样这条消息就可以重新存入队列中。 消息 msg1 之后被发送到了 ConsumerB 中,此时 ConsumerB 己经消费了 msg2 、 msg4 ,之后再消费 msg 1. 这样消息顺序性也就错乱了。或者消息 msg1 又重新发往 ConsumerA 中,此时 ConsumerA 己经消费了 msg3 , 那么再消费 msg1 ,消息顺序性也无法得到保障。同样可以用在 Basic.Recover 这个 AMQP 命令中 。

包括但不仅限于以上几种情形会使 RabbitMQ 消息错序 。 如果要保证消息的顺序性,需要业务方使用 RabbitMQ 之后做进一步的处理,比如在消息体内添加全局有序标识 (类似 Sequence ID) 来实现 。

# 弃用 QueueingConsumer

在前面的章节中所介绍的订阅消费的方式都是通过继承 DefaultConsumer 类来实现的 。 在 1.4.4 节提及了 QueueingConsumer 这个类,并且建议不要使用这个类来实现订阅消费。 QueueingConsumer 在 RabbitMQ 客户端 3.x 版本中用得如火如荼, 但是在 4.x 版本开始就被标记为 @Deprecated,想必这个类中有些无法弥补的缺陷。

不妨先看一下 QueueingConsumer 的用法,示例代码如代码清单 4-22 所示 。

代码清单 4-22

QueueingConsumer consumer = new QueueingConsumer(channel);
//channel.basicQos(64);// 使用 QueueingConsumer 的时候一定要添加 !
channel.basicConsume(QUEUE_NAME, false, "consumer_zzh", consumer);
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody()); 
    System.out.println("[X] Received '" + message + "'"); 
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false) ;
}
1
2
3
4
5
6
7
8
9

乍一看也没什么问题,而且实际生产环境中如果不是太 "傲娇" 地使用也不会造成什么大问题。 QueueingConsumer 本身有几个大缺陷 ,需要读者在使用时特别注意。首当其冲的就是内存溢出的问题,如果由于某些原因,队列之中堆积了比较多的消息,就可能导致消费者客户端内存溢出假死,于是发生恶性循环,队列消息不断堆积而得不到消化。

采用代码清单 4-22 中的代码进行演示, 首先向一个队列发送 200 多 MB 的消息 , 然后进行消费。在客户端调用 channel.basicConsume 方法订阅队列的时候, RabbitMQ 会持续地将消息发往 QueueingConsumer 中, QueueingConsumer 内部使用 LinkedBlockingQueue 来缓存这些消息。 通过 NisualVM 可以看到堆内存的变化,如图 4-14 所示。

堆内存的变化

由图 4-1 4 可以看到堆内存一直在增加,这里只测试了发送 200MB 左右的消息, 如果发送更多的消息,那么这个堆内存会变得更大,直到出现 java.lang.OutOfMemoryError 的报错。

这个内存溢出的问题可以使用 Basic.Qos 来得到有效的解决 , Basic.Qos 可以限制某个消费者所保持未确认消息的数量, 也就是间接地限制了 Queuei ngConsumer 中的 LinkedBlockingQueue 的大小 。注意一定要在调用 Basic.Consume 之前调用 Basic.Qos 才能生效。

QueueingConsumer 还包含 (但不仅限于) 以下一些缺陷:

  • QueueingConsumer 会拖累同一个 Connection 下的所有信道,使其性能降低;
  • 同步递归调用 QueueingConsumer 会产生死锁:
  • RabbitMQ 的自动连接恢复机制 (automatic connection recovery) 不支持 Queueing Consumer 的这种形式:
  • QueueingConsumer 不是事件驱动的。

为了避免不必要的麻烦,建议在消费的时候尽量使用继承 DefaultConsumer 的方式,具体使用方式可以参考代码清单 1-2 和代码清单 3-9。

#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
生产者确认
消息传输保障

← 生产者确认 消息传输保障→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式