生产者确认
在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器 ,何谈持久化?
RabbitMQ 针对这个问题,提供了两种解决方式:
- 通过事务机制实现;
- 通过发送方确认 (publisher confirm) 机制实现。
# 事务机制
RabbitMQ 客户端中与事务机制相关的方法有三个: channel.txSelect、channel.txCommit 和 channel.txRollback。channel.txSelect 用于将当前的信道设置成事务模式。 channel.txCommit 用于提交事务。 channel.txRollback 用于事务回滚。在通过 channel.txSelect 方法开启事务之后,我们便可以发布消息给 RabbitMQ 了, 如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。注意这里的 RabbitMQ 中的事务机制与大多数数据库中的事务概念井不相同,需要注意区分。
关键示例代码如代码清单 4-14 所示。
代码清单 4-14
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT TEXT_PLAIN, "transaction messages".getBytes());
channel.txCommit();
2
3
对应的 go 代码为:
package main
import (
"context"
"fmt"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
TxPublish()
}
// 事务提交
func TxPublish() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
// 开启事务
ch.Tx()
// 声明队列
q, _ := ch.QueueDeclare("test_tx", true, false, false, false, nil)
if err := ch.PublishWithContext(context.Background(), "", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("test_tx"),
}); err != nil {
ch.TxRollback()
fmt.Println("tx rollback")
return
}
ch.TxCommit()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
上面代码对应的 AMQP 协议流转过程如图 4-8 所示。
可以发现开启事务机制与不开启 (参考图 2-10) 相比多了四个步骤:
- 客户端发送 Tx.Select. 将信道置为事务模式;
- Broker 回复 Tx.Select-Ok. 确认己将信道置为事务模式;
- 在发送完消息之后,客户端发送 Tx.Commit 提交事务;
- Broker 回复 Tx.Commit-Ok. 确认事务提交。
上面所陈述的是正常的情况下的事务机制运转过程,而事务回滚是什么样子呢?我们先来 参考下面一段示例代码 (代码清单 4-15). 来看看怎么使用事务回滚。
代码清单 4-15
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
int result = 1 / 0;
channel.txCommit() ;
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
2
3
4
5
6
7
8
9
上面代码中很明显有一个 java.lang.ArithmeticException. 在事务提交之前捕获到异常,之后显式地提交事务回滚,其 AMQP 协议流转过程如图 4-9 所示。
如果要发送多条消息,则将 channel.basicPublish 和 channel.txCommit 等方法包裹进循环内即可,可以参考如下示例代码,(代码清单 4-16) 。
代码清单 4-16
channel.txSelect();
for (int i=O ; i<LOOP_TIMES;i++) {
try {
channel.basicPublish ("exchange", "routingKey", null, ("messages" + i).getBytes());
channel.txCommit( );
}catch(Exception e) {
e.printStackTrace();
channel.txRollback();
}
}
2
3
4
5
6
7
8
9
10
对应的 go 代码:
package main
import (
"context"
"fmt"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
TxPublish()
}
// 事务提交
func TxPublish() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
// 开启事务
ch.Tx()
// 声明队列
q, _ := ch.QueueDeclare("test_tx", true, false, false, false, nil)
for _, v := range []int{1, 2, 3, 4, 5, 6} {
if err := ch.PublishWithContext(context.Background(), "", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("test_tx" + fmt.Sprintf("%d",v)),
}); err != nil {
ch.TxRollback()
fmt.Println("tx rollback")
return
}
}
ch.TxCommit()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务机制会 "吸干" RabbitMQ 的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从 AMQP 协议层面来看并没有更好的办法,但是 RabbitMQ 提供了一个改进方案 ,即发送方确认机制,详情请看下一节的介绍。
# 发送方确认机制
前面介绍了 RabbitMQ 可能会遇到的一个问题,即消息发送方 (生产者) 并不知道消息是否真正地到达了 RabbitMQ。随后了解到在 AMQP 协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低 RabbitMQ 的消息吞吐量,这里就引入了一种轻量级的方式 -- 发送方确认 (publisher confirm) 机制 。
生产者将信道设置成 confmn (确认) 模式,一旦信道进入 confmn 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID (从 l 开始),一旦消息被投递到所有匹配的队列之后, RabbitMQ 就会发送一个确认 (Basic.Ack) 给生产者 (包含消息的唯一 ID) ,这就使得生产者知晓消息已经正确到达了目的地了。如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。 RabbitMQ 回传给生产者的确认消息中的 deliveryTag 包含了确认消息的序号,此外 RabbitMQ 也可以设置 channel.basicAck 方法中的 multiple 参数,表示到这个序号之前的所有消息都己经得到了处理,可以参考图 4-10 。注意辨别这里的确认和消费时候的确认之间的异同。
事务机制在一条消息发送之后会使发送端阻塞,以等待 RabbitMQ 的回应,之后才能继续发送下一条消息。相比之下, 发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之 后,生产者应用程序便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack (Basic.Nack) 命令,生产者应用程序同样可以在回调方法中处理该 nack 命令。
生产者通过调用 channel.confirmSelect 方法 (即 Confirm.Select 命令) 将信道设置为 confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式 。所有被发送的后续消息都被 ack 或者 nack 一次,不会出现一条消息既被 ack 又被 nack 的情况, 并且 RabbitMQ 也并没有对消息被 confirm 的快慢做任何保证。
下面看一下 publisher confirm 机制怎么运作,简要代码如代码清单 4-17 所示。
代码清单 4-17
try{
channel.confirmSelect();//将信道置为 publisher confirm 模式
//之后正常发送消息
channel.basicPublish("exchange", "routingKey", null, "publisher confirm test".getBytes());
if (!channel.waitForConfirms()) {
System.out.println("send message failed");
}
// do something else...
}catch(Exception e) {
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
package main
import (
"context"
"fmt"
"strconv"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
AckPublish()
}
func AckPublish() {
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
ch, _ := conn.Channel()
// 开启 comfirm 模式
// 当参数为 true 的时候,client 不会等待响应
err := ch.Confirm(false)
if err != nil {
fmt.Println("信道设置为confirm模式失败", err.Error())
}
// 监听消息是否发送成功
notifyConfirm := ch.NotifyPublish(make(chan amqp.Confirmation))
q, _ := ch.QueueDeclare("test_ack", true, false, false, false, nil)
for _, v := range []string{"a", "b", "c", "d", "e", "f"} {
ch.PublishWithContext(context.Background(), "", q.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello"+v),
})
}
for v := range notifyConfirm {
if v.Ack {
fmt.Println("该消息已经Ack: ",v.DeliveryTag)
} else {
fmt.Println("改消息没有Ack: ", v.DeliveryTag)
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
如果发送多条消息,只需要将 channel.basicPublish 和 channel.waitForConfirms 方法包裹在循环里面即可,可以参考事务机制,不过不需要把 channel.confirmSelect 方法包裹在循环内部。
在 publisher confirm 模式下发送多条消息的 AMQP 协议流转过程可以参考图 4-11 。
对于 channel.waitForConfirms 而言,在 RabbitMQ 客户端中它有 4 个同类的方法:
boolean waitForConfirms() throws InterruptedException
;boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException
void waitForConfirmsOrDie() throws IOException , InterruptedException
;void waitForConfirmsOrDie(long timeout) throws IOException , InterruptedException , TimeoutException
;
如果信道没有开启 publisher confirm 模式,则调用任何 waitForConfirms 方法都会报出 java.lang.IllegalStateException。对于没有参数的 waitForConfirms 方法来说, 其返回的条件是客户端收到了相应的 Basic.Ack/.Nack 或者被中断。参数 timeout 表示超时时间, 一旦等待 RabbitMQ 回应超时就会抛出 java.util.concurrent.TimeoutException 的异常。两个 waitForConfirmsOrDie 方法在接收到 RabbitMQ 返回 的 Basic.Nack 之后会抛出 java.io.IOException。业务代码可以根据自身的特性灵活地运用这四种方法来保障消息的可靠发送。
前面提到过 RabbitMQ 引入了 publisher confirm 机制来弥补事务机制的缺陷,提高了整体的吞吐量,那么我们来对比下两者之间的 QPS ,测试代码可以参考上面的示例代码 。
测试环境:客户端和 Broker 机器配置 --CPU 为 24 核、主频为 2600Hz、内存为 64GB 、硬盘为 lTB 。客户端发送的消息体大小为 10B ,单线程发送,并且消息都进行持久化处理。
测试结果如图 4-12 所示 。
图 4-12 中的横坐标表示测试的次数,纵坐标表示 QPS 。 可以发现 publisher confum 与事务机制相比, QPS 井没有提高多少,难道是 RabbitMQ 欺骗了我们?
我们再来回顾下前面的示例代码,可以发现 publisher confmn 模式是每发送一条消息后就调用 channel .waitForConfirms 方法,之后等待服务端的确认 ,这实际上是一种串行同步等待的方式。事务机制和它一样,发送消息之后等待服务端确认,之后再发送消息。两者的存储确认原理相同,尤其对于持久化的消息来说,两者都需要等待消息确认落盘之后才会返回 (调用 Linux 内核的 fsync 方法) 。在 同步等待的方式下, publisher confum 机制发送一条消息需要通信交互的命令是 2 条: Basic.Publish 和 Basic.Ack; 事务机制是 3 条: Basic.Publish 、 Tx.Commmit/.Commit-Ok (或者 Tx .Rollback/.Rollback-Ok) , 事务机制多了一个命令帧报文的交互 ,所以 QPS 会略微下降。
注意要点:
- 事务机制和 publisher confirm 机制两者是互斥的,不能共存。 如果企图将已开启事务模式的信道再设直为 publisher confmn 模式, RabbitMQ 会报错 {amqp_error, precondition_ failed , "cannot switch from tx to confirm mode" , ' confirm.select '}; 或者如果企图将已开启 publisher confum 模式的信道再设置为事务模式,RabbitMQ 也会报错:{amqp_error , precondition_failed , mode" , ' tx . select ' }.
- 事务机制和 publisher confirm 机制确保的是消息能够正确地发送至 RabbitMQ ,这里的 "发送至 RabbitMQ" 的含义是指消息被正确地发往至 RabbitMQ 的交换器,如果此交换器没有匹配的队列,那么消息也会丢失。所以在使用这两种机制的时候要确保所涉及的交换器能够有匹配的队列。 更进一步地讲,发送方要配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
publisher confrm 的优势在于并不一定需要同步确认 。 这里我们改进了一下使用方式,总结有如下两种:
- 批量 confirm 方法:每发送一批消息后,调用 channel.waitForConfirms 方法,等待服务器的确认返回 。
- 异步 confirm 方法:提供一个回调方法,服务端确认了一条或者多条消息后客户端会回调这个方法进行处理。
在批量 confirm 方法中,客户端程序需要定期或者定量 (达到多少条),亦或者两者结合起来调用 channel.waitForConfirms 来等待 RabbitMQ 的确认返回。相比于前面示例中的普通 confirm 方法,批量极大地提升了 confirm 的效率,但是问题在于出现返回 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能应该是不升反降的。
批量 confirm 方法的示例代码如代码清单 4-18 所示 。
代码清单 4-18
try {
channel.confirmSelect();
int MsgCount = 0;
while (true) {
channel.basicPublish("exchange", "routingKey", null, "batch confirm test".getBytes());
//将发送出去 的消息存入缓存中,缓存可以是
//一个 ArrayList 或者 BlockingQueue 之类的
if (++MsgCount >= BATCH_COUNT) {
MsgCount = 0;
try {
if (channel.waitForConfirms()) {
//将缓存中的消息清空
}
//将缓存中的消息重新发送
} catch((InterruptedException e) {
e.printStackTrace();
//将缓存中 的消息重新发送
}
}
}
}catch (IOException e) {
e.printStackTrace();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
异步 confirm 方法的编程实现最为复杂。在客户端 Channel 接口中提供的 addConfirmListener 方法可以添加 ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法: handleAck 和 handleNack ,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack 。在这两个方法中都包含有一个参数 deliveryTag (在 publisher confirm 模式下用来标记消息的唯一有序序号)。我们需要为每一个信道维护一个 "unconfirm" 的消息序号集合, 每发送一条消息,集合中的元素加 1 。每当调用 ConfirmListener 中的 handleAck 方法时, "unconfirm" 集合中删掉相应的一条 (multiple 设置为 false ) 或者多条 (multiple 设置为 true ) 记录。从程序运行效率上来看,这个 "unconfrrm" 集合最好采用有序集合 SortedSet 的存储结构。事实上, Java 客户端 SDK 中的 waitForConfirms 方法也是通过 SortedSet 维护消息序号的 。 代码清单 4-19 为我们 演示了异步 confirm 的编码实现 , 其中的 confirmSet 就是一个 SortedSet 类型的集合。
代码清单 4-18
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener () {
public void handleAck(long deliveryTag , boolean multiple) throws IOException {
System.out.println("Nack , SeqNo: " + deliveryTag + " , multiple : " + multiple) ;
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear( );
} else {
confirmSet.remove(deliveryTag);
}
}
public void handleNack (long deliveryTag ,boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag - 1).clear( );
} else {
confirmSet.remove(deliveryTag);
}
//注意这里需要添加处理消息重发的场景
}
})
//下面是演示一直发送消息的场景
while (true) {
long nextSeqNo = channel.getNextPublishSeqNo();
channel.basicPublish(ConfirmConfig.exchangeName,MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
confirmSet.add(nextSeqNo) ;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
最后我们将事务、普通 confirm 批量 confirm 和异步 confirm 这 4 种方式放到一起来比较一下彼此的 QPS 。 测试环境和数据和图 4-12 中的测试相同, 具体测试对比如 图 4-13 所示。
可以看到批量 confirm 和异步 confirm 这两种方式所呈现的性能要比其余两种好得多。事务机制和普通 confirm 的方式吐吞量很低,但是编程方式简单,不需要在客户端维护状态 (这里指的是维护 deliveryTag 及缓存未确认的消息)。批量 confirm 方式的问题在于遇到 RabbitMQ 服务端返回 Basic.Nack 需要重发批量消息而导致的性能降低。异步 confirm 方式编程模型最为复杂,而且和批量 confirm 方式一样需要在客户端维护状态。在实际生产环境中采用何种方式,这里就仁者见仁智者见智了,不过强烈建议读者使用异步 confirm 的方式。