消费消息
RabbitMQ 的消费模式分两种:推 (Push) 模式和拉 ( Pull ) 模式 。 推模式采用 Basic.Consume 进行消费,而拉模式则是调用 Basic.Get 进行消费。
# 推模式
在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
2
接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现 。当调用与 Consumer 相关的 API 方法时,不同的订阅采用不同的消费者标签 (consumerTag) 来区分彼此,在同一个 Channel 中的消费者也需要通过唯一的 消费者标签以作区分 ,码如代码清单 3-9 所示。
代码清单 3-9
boolean autoAck = false ;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope ,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here . .. )
channel.basicAck(deliveryTag, false);
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
注意,上面代码中显式地设置 autoAck 为 false, 然后在接收到消息之后进行显式 ack 操作 (channel.basicAck), 对于消费者来说这个设置是非常必要的, 可以防止消息不必要地丢失。
Channel 类中 basicConsume 方法有如下几种形式:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String Queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck , String consumerTag ,boolean noLocal, boolean exclusive, Map<Str ing, Object> arguments , Consumer callback)throws IOException;
其对应的参数说明如下所述。
- queue: 队列的名称:
- autoAck : 设置是否自动确认。建议设成 false ,即不自动确认;
- consumerTag: 消费者标签,用来区分多个消费者;
- noLocal: 设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;
- exclusive : 设置是否排他;
- arguments : 设置消费者的其他参数;
- callback : 设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如 DefaultConsumer , 使用时需要客户端重写 (override) 其中的方法。
对于消费者客户端来说重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法,具体如下 :
void handleConsumeOk(String consumerTag);
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag , ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
2
3
4
5
比如 handleShutdownSignal 方法,当 Channel 或者 Connection 关闭的时候会调用。 再者, handleConsumeOk 方法会在其他方法之前调用,返回消费者标签。
重写 handleCancelOk 和 handleCancel 方法,这样消费端可以在显式地或者隐式地取 消订阅的时候调用。也可以通过 channel . basicCancel 方法来显式地取消 一个消费者的订阅;
channel.basicCancel(consumerTag);
注意上面这行代码会首先触发 handleConsumerOk 方法,之后触发 handleDelivery 方法,最后才触发 handleCancelOk 方法。
和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些 callback 会被分配到与 Channel 不同的线程池上, 这意味着消费者客户端可以安全地调用这些阻塞方法,比如 channel.queueDeclare、 channel.basicCancel 等。
每个 Channel 都拥有自己独立的线程。最常用的做法是一个 Channel 对应一个消费者, 也就是意味着消费者彼此之间没有任何关联。当然也可以在一个 Channel 中维持多个消费者, 但是要注意一个问题,如果 Channel 中的 一个消费者一直在运行,那么其他消费者的 callback 会被 "耽搁" 。
# 拉模式
这里讲一下拉模式的消费方式。通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetResponeo Channel 类的 basicGet 方法没有其他重载方法,只有 :
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
其中 queue 代表队列的名称,如果设置 autoAck 为 false,那么同样需要调用 channel.basicAck 来确认消息己被成功接收。
拉模式的关键代码如代码清单 3- 10 所示。
代码清单 3-10
GetResponse response = channel.basicGet(QUEUE NAME , false);
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
2
3
2.2.2 节中的消费者流传过程指的是推模式,这里采用的拉模式的消费方式如图 3-2 所示 (只展示消费的部分)。
注意要点:
Basic.Consume 将信道 (Channel) 置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到 Basic.Qos 的限制。如果只想从队列获得单条消息而不是持续订阅,建议还是使用 Basic.Get 进行消费。但是不能将 Basic.Get 放在一个循环里来代替 Basic.Consume ,这样做会严重影响 RabbitMQ 的性能。如果要实现高吞吐量,消费者理应使用 Basic.Consume 方法。