相关概念介绍
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说, RabbitMQ 模型更像是一种交换机模型。
RabbitMQ 的整体模型架构如图 2-1 所示 。
# 生产者和消费者
Producer: 生产者,就是投递消息的一方。
生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签 (Label) 。消息体也可以称之为 payload ,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息 , 比如 一个交换器的名称和 一个路由键 。 生产者把消息交由 RabbitMQ 之后会根据标签把消息发送给感兴趣 的消费者 (Consumer) 。
Consumer: 消费者,就是接收消息的一方。
消费者连接到 RabbitMQ 服务器,并订阅到队列上。当消费者消费一条消息时 , 只是消费消息的消息体 (payload) 。 在消息路由的过程中 , 消息的标签会丢弃, 存入到队列中的消息只有消息体,消费者也只会消费到消息体 , 也就不知道消息的生产者是谁,当然消费者也不需要知道。
Broker: 消息中间件的服务节点。
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点 ,或者 RabbitMQ 服务实例 。 大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
图 2-2 展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程。
首先生产者将业务方数据进行可能的包装,之后封装成消息,发送 (AMQP 协议里这个动作对应的命令为 Basic.Publish) 到 Broker 中 。 消费者订阅并接收消息 (AMQP 协议里这个动作对应的命令为 Basic.Consume 或者 Basic.Get),经过可能的解包处理得到原始的数据,之后再进行业务处理逻辑。这个业务处理逻辑并不一定需要和接收消息的逻辑使用同一个线程。消费者进程可以使用一个线程去接收消息,存入到内存中,比如使用 Java 中的 BlockQueue 。 业务处理逻辑使用另一个线程从内存中读取数据,这样可以将应用进一步解耦,提高整个应用的处理效率。
# 队列
Queue: 队列,是 RabbitMQ 的内部对象,用于存储消息。参考图 2-1,队列可以用图 2-3 表示。
RabbitMQ 中消息都只能存储在队列中,这一点和 Katka 这种消息中间件相反 。 Katka 将消息存储在 topic (主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息井最终投递到队列中,消费者可以从队列中获取消息并消费。
多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊 (Round-Robin,即轮询) 给多个消费者进行处理,而不是每个消费者都收到所有的消息井处理,如图 2-4 所示 。
RabbitMQ 不支持队列层面的广播消费,如果需要广播消费,需要在其上进行二次开发,处理逻辑会变得异常复杂,同时也不建议这么做。
# 交换器、路由键、绑定
Exchange: 交换器。在图 2-4 中我们暂时可以理解成生产者将消息投递到队列中,实际上这个在 RabbitMQ 中不会发生。真实情况是,生产者将消息发送到 Exchange (交换器,通常也可以用大写的 "X" 来表示),由交换器将消息路由到一个或者多个队列中。如果路由不到,或许会返回给生产者,或许直接丢弃。这里可以将 RabbitMQ 中的交换器看作一个简单的实体,更多的细节会在后面的章节中有所涉及。
交换器的具体示意图如图 2-5 所示。
RabbitMQ 中的交换器有四种类型,不同的类型有着不同的路由策略,这将在下一节的交换器类型 (Exchange Types) 中介绍 。
RoutingKey: 路由键。生产者将消息发给交换器的时候, 一般会指定一个 RoutingKey ,用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。
在交换器类型和绑定键 (BindingKey) 固定的情况下,生产者可以在发送消息给交换器时, 通过指定 RoutingKey 来决定消息流向哪里。
Binding: 绑定。 RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey) ,这样 RabbitMQ 就知道如何正确地将消息路由到队列了,如图 2-6 所示。
生产者将消息发送给交换器时,需要一个 RoutingKey ,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的 BindingKey。BindingKey 并不是在所有的情况下都生效,它依赖于交换器类型, 比如 fanout 类型的交换器就会无视 BindingKey, 而是将消息路由到所有绑定到该交换器的队列中。
对于初学者来说,交换器、路由键、绑定这几个概念理解起来会有点晦涩,可以对照着代码清单 1-1 来加深理解。
沿用本章开头的比喻,交换器相当于投递包裹的邮箱 , RoutingKey 相当于填写在包裹上的地址, BindingKey 相当于包裹的目的地,当填写在包裹上的地址和实际想要投递的地址相匹配时,那么这个包裹就会被正确投递到目的地,最后这个目的地的 "主人" 一一队列可以保留这个包裹。如果填写的地址出错,邮递员不能正确投递到目的地,包裹可能会回退给寄件人,也有可能被丢弃。
提示
比较形象的解释可以看这个 blog:RabbitMQ Exchange Queue RoutingKey BindingKey 解析 (opens new window)
有经验的读者可能会发现,在某些情形下, RoutingKey 与 BindingKey 可以看作同一个东西。 代码清单 2-1 所展示的是代码清单 1-1 中的部分代码:
代码清单 2-1
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key
exchange := "test_direct_exchange"
routingKey := "test.direct"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
//注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。
err = ch.ExchangeDeclare(
exchange, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
msg := "Hello World RabbitMQ Direct Exchange Message ... "
// 5. 发送消息
//exchange:要发送到的交换机名称,对应图中exchangeName。
//key:路由键,对应图中RoutingKey。
//mandatory:消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式, 设置为 true 表示将消息返回到生产者,否则直接丢弃消息。直接false,不建议使用。
//immediate :参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。直接false,不建议使用。RabbitMQ 3.0版本开始去掉了对immediate参数的支持。
//msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
err = ch.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", msg)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 1. 建立RabbitMQ连接
conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 2. 创建channel
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 3. 声明exchange,routing key,queue name
exchange := "test_direct_exchange"
routingKey := "test.direct"
queueName := "test_direct"
// 4. 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.ExchangeDeclare(
exchange, // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 5. 声明(创建)一个队列
//name:队列名称
//durable:是否持久化,true为是。持久化会把队列存盘,服务器重启后,不会丢失队列以及队列内的信息。(注:1、不丢失是相对的,如果宕机时有消息没来得及存盘,还是会丢失的。2、存盘影响性能。)
//autoDelete:是否自动删除,true为是。至少有一个消费者连接到队列时才可以触发。当所有消费者都断开时,队列会自动删除。
//exclusive:是否设置排他,true为是。如果设置为排他,则队列仅对首次声明他的连接可见,并在连接断开时自动删除。(注意,这里说的是连接不是信道,相同连接不同信道是可见的)。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
q, err := ch.QueueDeclare(
queueName, // name
false, // durable
false, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 6. 队列绑定
//name:队列名称
//key:BandingKey,表示要绑定的键。
//exchange:交换器名称
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
err = ch.QueueBind(
q.Name, // queue name
routingKey, // routing key
exchange, // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
// 7. RMQ Server主动把消息推给消费者
//queue:队列名称。
//consumer:消费者标签,用于区分不同的消费者。
//autoAck:是否自动回复ACK,true为是,回复ACK表示告诉服务器我收到消息了。建议为false,手动回复,这样可控性强。
//exclusive:设置是否排他,排他表示当前队列只能给一个消费者使用。
//noLocal:如果为true,表示生产者和消费者不能是同一个connect。
//nowait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Make sure to add code blocks to your code group
以上代码声明了一个 direct 类型的交换器 (交换器的类型在下一节会详细讲述),然后将交换器和队列绑定起来。注意这里使用的字样是 "ROUTING_KEY" ,在本该使用 BindingKey 的 channel.queueBind 方法中却和 channel.basicPublish 方法同样使用了 RoutingKey, 这样做的潜台词是:这里的 RoutingKey 和 BindingKey 是同一个东西。在 direct 交换器类型下, RoutingKey 和 BindingKey 需要完全匹配才能使用,所以上面代码中采用了此种写法会显得方便许多。
但是在 topic 交换器类型下, RoutingKey 和 BindingKey 之间需要做模糊匹配,两者并不是相同的。
BindingKey 其实也属于路由键中的一种,官方解释为: the routing key to use for the binding。 可以翻译为:在绑定的时候使用的路由键。大多数时候,包括官方文档和 RabbitMQ Java API 中都把 BindingKey 和 RoutingKey 看作 RoutingKey,为了避免混淆,可以这么理解:
- 在使用绑定的时候,其中需要的路由键是 BindingKey 。涉及的客户端方法如: channel.exchangeBind、 channel.queueBind ,对应的 AMQP 命令 (详情参见 2.2 节) 为 Exchange.Bind、 Queue.Bind。
- 在发送消息的时候,其中需要的路由键是 RoutingKey 。涉及的客户端方法如 channel.basicPublish,对应的 AMQP 命令为 Basic.Publish。
由于某些历史的原因,包括现存能搜集到的资料显示:大多数情况下习惯性地将 BindingKey 写成 RoutingKey ,尤其是在使用 direct 类型的交换器的时候。本文后面的篇幅中也会将两者合称为路由键,读者需要注意区分其中的不同,可以根据上面的辨别方法进行有效的区分。
# 交换器类型
RabbitMQ 常用的交换器类型有 fanout、 direct、 topic、 headers 这四种 。 AMQP 协议里还提到另外两种类型: System 和自定义,这里不予描述。对于这四种类型下面一一阐述。
# fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
# direct
direct 类型的交换器路由规则也很简单,它会把消息路由到那些 BindingKey 和 RoutingKey 完全匹配的队列中。
以图 2-7 为例,交换器的类型为 direct,如果我们发送一条消息,并在发送消息的时候设置路由键为 "warning" ,则消息会路由到 Queuel 和 Queue2,对应的示例代码如下:
如果在发送消息的时候设置路由键为 info
或者 debug
,消息 只会路由到 Queue2 。 如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
# topic
前面讲到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey ,但是这种严 格的匹配方式在很多情况下不能满足实际业务的需求。 ωpic 类型的交换器在匹配规则上进行了 扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队 列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号
.
分隔的字符串 (被点号.
分隔开的每一段独立的字符串称为一个单词),如com.rabbitmq.client
、java.util.concurrent
、com.hidden.client
。 - BindingKey 和 RoutingKey 一样也是点号
.
分隔的字符串; - BindingKey 中可以存在两种特殊字符串
*
和#
,用于做模糊匹配,其中#
用于匹配一个单词,*
用于匹配多规格单词 (可以是零个)。
以图 2-8 中的配置为例:
- 路由键为
com.rabbitmq.client
的消息会同时路由到 Queue1 和 Queue2; - 路由键为
com.hidden.client
的消息只会路由到 Queue2 中; - 路由键为
com.hidden.demo
的消息只会路由到 Queue2 中; - 路由键为
java.rabbitmq.demo
的消息只会路由到 Queue1 中; - 路由键为
java.util.concurrent
的消息将会被丢弃或者返回给生产者 (需要设置 mandatory 参数) ,因为它没有匹配任何路由键。
# headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时制定一组键值对, 当发送消息到交换器时, RabbitMQ 会获取到该消息的 headers (也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。 headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
# RabbitMQ 运转流程
了解了以上的 RabbitMQ 架构模型及相关术语,再来回顾整个消息队列的使用过程。在最初状态下,生产者发送消息的时候 (可依照图 2-1):
- 生产者连接到 RabbitMQ Broker, 建立一个连接 (Connection) ,开启一个信道 (Channel) (详细内容请参考 3.1 节) 。
- 生产者声明一个交换器,并设置相关属性,比如交换机类型、是否持久化等 (详细内容请参考 3.2 节) 。
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等 (详细内容请参考 3.2 节) 。
- 生产者通过路由键将交换器和队列绑定起来 (详细内容请参考 3.2 节)。
- 生产者发送消息至 RabbitMQ Broker,其中包含路由键、交换器等信息 (详细内容请参 考 3.3 节)。
- 相应的交换器根据接收到的路由键查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者 (详细内容请参 考 4.1 节)。
- 关闭信道。
- 关闭连接。
注意
3 和 4 步在 direct 交换机类型时,是在消费者步骤实现的。不太清楚本书为什么会在生产者实现。
消费者接收消息的过程:
- 消费者连接到 RabbitMQ Broker,建立一个连接 (Connection) ,开启一个信道 (Channel) 。
- 消费者向 RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作 (详细内容请参考 3 .4 节)。
- 等待 RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
- 消费者确认 (ack) 接收到的消息。
- RabbitMQ 从队列中删除相应己经被确认的消息。
- 关闭信道。
- 关闭连接。
如图 2-9 所示,我们又引入了两个新的概念: Connection 和 Channel。我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。 一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道 (Channel) ,每个信道都会被指派一个唯一的 ID 。信道是建立在 Connection 之上的虚拟连接, RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。
我们完全可以直接使用 Connection 就能完成信道的工作,为什么还要引入信道呢?试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection ,也就是许多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用类似 NIO1 (Non-blocking I/O) 的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理 。
1: NIO ,也称非阻塞 I/O , 包含三大核心部分:Channel (信道)、 Buffer (缓冲区)和 Selector (选择器)。 NIO 基于 Channel 和 Buffer 进行操作,数据总是从信道读取数据到缓冲区中,或者从缓冲区写入到信道中。 Selector 用于监听多个信道的事件(比如连接打开,数据到达等)。因此,单线程可以监听多个数据的信道。 NIO 中有一个很有名的 Reactor 模式,有兴趣的读者可以深入研究。
每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是当信道本身的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。 此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中, 至于这些相关的调优策略需要根据业务自身的实际情况进行调节,更多内容可以参考第 9 章。
信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。在代码清单 1-1 中也可以看出一些端倪,比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish 和 channel.basicConsume 等方法。 RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令,在下面的小节中将会为大家一一展开。