消费端的确认与拒绝
为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制 (message acknowledgement) 。 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存 (或者磁盘) 中移去消息 (实质上是先打上删除标记,之后再删除) 。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的消息置为确认,然后从内存 (或者磁盘) 中删除,而不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置 autoAck 参数为 false ,消费者就有足够的时间处理消息 (任务) ,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题 , 因为 RabbitMQ 会一直等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
当 autoAck 参数置为 false ,对于 RabbitMQ 服务端而言 ,队列中的消息分成了两个部分:一部分是等待投递给消费者的消息;一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。 如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。
RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
RabbtiMQ 的 Web 管理平台 (详细参考第 5.3 节) 上可以看到当前队列中的 "Ready" 状态 和 "Unacknowledged" 状态的消息数,分别对应上文中的等待投递给消费者的消息数和己经投 递给消费者但是未收到确认信号的消息数,参考图 3-3。
也可以通过相应的命令来查看上述信息:
root@091f49ee4238:/# rabbitmqctl list_queues name messages_ready
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready
test_queue 0
root@091f49ee4238:/#
2
3
4
5
6
在消费者接收到消息后,如果想明确拒绝当前的消息而不是确认,那么应该怎么做呢?RabbitMQ 在 2.0.0 版本开始引入了 Basic.Reject 这个命令,消费者客户端可以调用与其对应的 channel.basicReject 方法来告诉 RabbitMQ 拒绝这个消息。
Channel 类中的 basicReject 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中 deliveryTag 可以看作消息的编号 ,它是一个 64 位的长整型值,最大值是 9223372036854775807 。如果 requeue 参数设置为 true ,则 RabbitMQ 会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果 requeue 参数设置为 false ,则 RabbitMQ 立即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject 命令一次只能拒绝一条消息 ,如果想要批量拒绝消息 ,则可以使用 Basic.Nack 这个命令。 消费者客户端可以调用 channel.basicNack 方法来实现,方法定义如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中 deliveryTag 和 requeue 的含义可以参考 basicReject 方法。 multiple 参数设置为 false 则表示拒绝编号为 deliveryTag 的这一条消息,这时候 basicNack 和 basicReject 方法一样;multiple 参数设置为 true 则表示拒绝 deliveryTag 编号之前所有未被当前消费者确认的消息。
注意要点:
将 channel.basicReject 或者 channel.basicNack 中的 requeue 设直为 false ,可以启用 "死信队列" 的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。 详细内容可以参考 4.3 节。
对于 requeue ,AMQP 中还有一个命令 Basic.Recover 具备可重入队列的特性 。其对应的客户端方法为:
Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
这个 channel.basicRecover 方法用来请求 RabbitMQ 重新发送还未被确认的消息 。 如果 requeue 参数设置为 true ,则未被确认的消息会被重新加入到队列中,这样对于同一条消息 来说,可能会被分配给与之前不同的消费者。如果 requeue 参数设置为 false ,那么同 一条消息会被分配给与之前相同的消费者。默认情况下,如果不设置 requeue 这个参数,相当于 channel.basicRecover (true) ,即 requeue 默认为 true。