使用交换器和队列
交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需确保在使用它们的时候就已经存在了,在使用之前需要先声明 (declare) 它们。
代码清单 3-6 演示了如何声明一个交换器和队列:
代码清单 3-6
channel.exchangeDeclare(exchangeName, "direct", true) ;
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
2
3
上面创建了一个持久化的、非自动删除的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列 (此队列的名称由 RabbitMQ 自动生成)。这里的交换器和队列也都没有设置特殊的参数。
上面的代码也展示了如何使用路由键将队列和交换器绑定起来。上面声明的队列具备如下特性:只对当前应用中同一个 Connection 层面可用,同一个 Connection 的不同 Channel 可共用,并且也会在应用连接断开时自动删除。
如果要在应用中共享一个队列,可以做如下声明,如代码清单 3-7 所示。
代码清单 3-7
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
2
3
这里的队列被声明为持久化的 、非排他的、非自动删除的,而且也被分配另一个确定的己知的名称 (由客户端分配而非 RabbitMQ 自动生成)。
注意: Channel 的 API 方法都是可以重载的,比如 exchangeDeclare、 queueDeclare. 根据参数不同,可以有不同的重载形式,根据自身的需要进行调用。
生产者和消费者都可以声明一个交换器或者队列。如果尝试声明一个已经存在的交换器或者队列, 只要声明的参数完全匹配现存的交换器或者队列,RabbitMQ 就可以什么都不做, 并成功返回。 如果声明的参数不匹配则会抛出异常。
注意
以下 go 代码内容不属于本书,为本人新增部分。
go 声明一个队列和交换机。
// 3. 声明exchange,routing key
exchange := "test_direct_exchange"
routingKey := "test.direct"
// 声明(创建)一个交换机
//name:交换器的名称。
//kind:也叫作type,表示交换器的类型。有四种常用类型:direct、fanout、topic、headers。
//durable:是否持久化,true表示是。持久化表示会把交换器的配置存盘,当RMQ Server重启后,会自动加载交换器。
//autoDelete:是否自动删除,true表示是。至少有一条绑定才可以触发自动删除,当所有绑定都与交换器解绑后,会自动删除此交换器。
//internal:是否为内部,true表示是。客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器。
//noWait:是否非阻塞,true表示是。阻塞:表示创建交换器的请求发送后,阻塞等待RMQ Server返回信息。非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
//args:直接写nil,没研究过,不解释。
//注意,在生产者里声不声明(创建)交换机都可以。这里声明,是为了防止消费者没有启动或者这个交换机原先不存在,导致消息投递丢失。
_ = ch.ExchangeDeclare(
exchange,
"direct",
false,
true,
false,
false,
nil,
)
// 5. 发送消息
//exchange:要发送到的交换机名称,对应图中exchangeName。
//key:路由键,对应图中RoutingKey。
//mandatory:消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器之后无法路由到队列的情况对消息的处理方式, 设置为 true 表示将消息返回到生产者,否则直接丢弃消息。直接false,不建议使用。
//immediate :参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递:如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。直接false,不建议使用。RabbitMQ 3.0版本开始去掉了对immediate参数的支持。
//msg:要发送的消息,msg对应一个Publishing结构,Publishing结构里面有很多参数,这里只强调几个参数,其他参数暂时列出,但不解释。
err = ch.Publish(
exchange, // exchange
routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msg),
})
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# exchangeDeclare 方法详解
exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
Exchange.DeclareOk exchangeDeclare(String exchange,
String type, boolean durable, boolean autoDelete, boolean internal,
Map<String, Object> arguments) throws IOException;
2
3
golang 对应的方法:
func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}
return ch.call(
&exchangeDeclare{
Exchange: name,
Type: kind,
Passive: false,
Durable: durable,
AutoDelete: autoDelete,
Internal: internal,
NoWait: noWait,
Arguments: args,
},
&exchangeDeclareOk{},
)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
这个方法的返回值是 Exchange.DeclareOK,用来标识成功声明了一个交换器。
各个参数详细说明如下所述。(golang 参数同理)
- exchange: 交换器的名称。
- type : 交换器的类型,常见的如 fanout、 direct、 topic , 详情参见 2. 1.4 节。
- durable: 设置是否持久化 。 durable 设置为 true 表示持久化, 反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
- autoDelete: 设置是否自动删除。 autoDelete 设置为 true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: "当与此交换器连接的客户端都断开时 , RabbitMQ 会自动删除本交换器"。
- internal : 设置是否是内置的。如果设置为 true ,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
- argument : 其他一些结构化参数,比如 alternate-exchange (有关 alternateexchange 的详情可以参考 4. 1.3 节)。
exchangeDe clare 的其他重载方法如下 :
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException
;Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durab1e) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, booleandurable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
与此对应的, 将第二个参数 Stringtype 换成 BuiltInExchangeType type 对应的几个重载方法 (不常用);
- Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOE xception;
- Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
- Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map< String, Object> arguments) throws IOException;
- Exchange.DeclareOk exchangeDec1are(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
与 exchangeDeclare 师出同门的还有几个方法, 比如 exchangeDeclareNoWait 方法, 具体定义如下 (当然也有 BuiltExchangeType 版的, 这里就不展开了);
void exchangeDeclareNoWait(String exchange, String type, boolean durab1e, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
这个 exchangeDeclareNoWait 比 exchangeDeclare 多设置了 一个 nowait 参数,这个 nowait 参数指的是 AMQP 中 Exchange.Declare 命令的参数, 意思是不需要服务器返回,注意这个方法的返回值是 void ,而普通的 exchangeDeclare 方法的返回值是 Exchange.DeclareOk,意思是在客户端声明了一个交换器之后,需要等待服务器的返回 (服务器会返回 Exchange.Declare-Ok 这个 AMQP 命令)。
针对 "exchangeDeclareNoWait 不需要服务器任何返回值" 这一点,考虑这样一种情况,在声明完一个交换器之后 (实际服务器还并未完成交换器的创建), 那么此时客户端紧接着使用这个交换器,必然会发生异常。如果没有特殊的缘由和应用场景,并不建议使用这个方法。
这里还有师出同门的另一个方法 exchangeDeclarePassive,这个方法的定义如下:
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
这个方法在实际应用过程中还是非常有用的,它主要用来检测相应的交换器是否存在。如果存在则正常返回; 如果不存在则抛出异常:404 channel exception,同时 Channel 也会被关闭。
有声明创建交换器的方法,当然也有删除交换器的方法。相应的方法如下:
- Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
- void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
- Exchange.DeleteOk exchangeDelete(String exchange, boo1ean ifUnused) throws IOException;
其中 exchange 表示交换器的名称,而 ifUnused 用来设置是否在交换器没有被使用的情况下删除 。 如果 isUnused 设置为 true ,则只有在此交换器没有被使用的情况下才会被删除; 如果设置 false ,则无论如何这个交换器都要被删除。
# queueDeclare 方法详解
queueDeclare 相对于 exchangeDeclare 方法而言,重载方法的个数就少很多,它只有两个重载方法:
- Queue.DeclareOk queueDec1are() throwsIOException;
- Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的 (类似这种 amq.gen-LhQzlgv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。
golang 的队列声明也是类似:
func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
if err := args.Validate(); err != nil {
return Queue{}, err
}
req := &queueDeclare{
Queue: name,
Passive: false,
Durable: durable,
AutoDelete: autoDelete,
Exclusive: exclusive,
NoWait: noWait,
Arguments: args,
}
res := &queueDeclareOk{}
if err := ch.call(req, res); err != nil {
return Queue{}, err
}
if req.wait() {
return Queue{
Name: res.Queue,
Messages: int(res.MessageCount),
Consumers: int(res.ConsumerCount),
}, nil
}
return Queue{Name: name}, nil
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
方法的参数详细说明如下所述。
- queue : 队列的名称。
- durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
- exclusive : 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接 (Connection) 可见的,同一个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列;"首次" 是指如果一个连接己经声明了 一个 排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队 列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
- autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。
- argurnents: 设置队列的其他一些参数,如 x-message-ttl、x-expires、x -max-length、x-max-length-bytes、x-dead-letter-exchange、 x-dead-letter-routing-key、x-max-priority 等。
注意要点:
生产者和消费者都能够使用 queueDeclare 来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。必须先取消订阅,然后将信道置为 "传输" 模式,之后才能声明队列。
对应于 exchangeDeclareNoWait 方法,这里也有一个 queueDeclareNoWait 方法:
void queueDec1areNoWait(String queue, boo1ean durab1e, boo1ean exc1usive, boo1ean autoDe1ete, Map<String , Object> arguments) throws IOExcepttion;
方法的返回值也是 void ,表示不需要 服务端的任何返回。同样也需要注意,在调用完 queueDeclareNoWait 方法之后,紧接着使用声明的队列时有可能会发生异常情况。
同样这里还有一个 queueDeclarePassive 的方法,也比较常用。这个方法用来检测相应的队列是否存在。 如果存在则正常返回,如果不存在则抛出异常: 404 channel exception,同时 Channel 也会被关闭。方法定义如下:
Queue.Dec1areOk queueDec1arePassive(String queue) throws IOException;
与交换器对应,关于队列也有删除的相应方法:
- Queue.De1eteOk queueDe1ete(String queue)throws IOException;
- Queue.De1eteOk queueDe1ete(String queue, boo1ean ifUnused, boolean ifEmpty) throws IOException;
- void queueDe1eteNoWait(String queue, boo1ean ifUnused, boo1ean ifEmpty) throws IOException;
其中 queue 表示队列的名称,ifUnused 可以参考上一小节的交换器。ifEmpty 设置为 true 表示在队列为空 (队列里面没有任何消息堆积) 的情况下才能够删除。
与队列相关的还有一个有意思的方法 queuepurge ,区别于 queueDelete ,这个方法用来清空队列中的内容,而不删除队列本身,具体定义如下:
Queue.PurgeOk queuePurge(String queue) throws IOException;
# queueBind 方法详解
将队列和交换器绑定的方法如下 ,可以与前两节中的方法定义进行类比。
- Queue.BindOk queueBind(String queue, String exchange , String routingKey) throws IOException;
- Queue.BindOk queueBind(String queue, String exchange, Map<String, Object> arguments) throws IOException;
- void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
针对 go,队列和交换机绑定的方法如下:
func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
if err := args.Validate(); err != nil {
return err
}
return ch.call(
&queueBind{
Queue: name,
Exchange: exchange,
RoutingKey: key,
NoWait: noWait,
Arguments: args,
},
&queueBindOk{},
)
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
方法中涉及的参数详解。
- queue: 队列名称;
- exchange: 交换器的名称;
- routingKey: 用来绑定队列和交换器的路由键;
- argument: 定义绑定的一些参数。
不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体方法可以参考如下 (具体的参数解释可以参考前面的内容,这里不再赘述)
- Queue.UnbindOk queueUnbind(String queue, String exchange , String routingKey) throws IOException;
- Queue.UnbindOk queueUnbind(String queue , String exchange , String routingKey , Map<String , Object> arguments) throws IOException;
# exchangeBind 方法详解
我们不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定,后者和前者的用法如出一辙,相应的方法如下:
- Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
- Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String , Object> arguments) throws IOException ;
- void exchangeBindNoWait(String destination , String sour ce , String routingKey ,Map<String , Object> arguments)throws IOException;
方法中的参数可以参考 3 .2.1 节的 exchangeDeclare 方法。绑定之后 ,消息从 source 交换器转发到 destination 交换器,某种程度上来说 destination 交换器可以看作一个队列 。示例代码如代码清单 3-8 所示。
代码清单 3-8
channel.exchangeDeclare("source", "direct", false, true, null);
channel.exchangeDeclare("destination" , "fanout", false , true, null);
channel.exchangeBind("destination" , "source", "exKey");
channel.queueDeclare("queue", false, false, true, null);
channel.queueBind("queue", "destination ", "");
channel.basicPublish("source", "exKey", null, "exToExDemo".getBytes());
2
3
4
5
6
生产者发送消息至交换器 source 中,交换器 source 根据路由键找到与其匹配的另一个交换器 destination , 井把消息转发到 destination 中 , 进而存储在 destination
绑定的队列 queue 中 , 可参考图 3-1 。
# 何时创建
RabbitMQ 的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如果要衡量 RabbitMQ 当前的 QPS 只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
按照 RabbitMQ 官方建议,生产者和消费者都应该尝试创建 (这里指声明操作) 队列。这是一个很好的建议,但不适用于所有的情况。如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好 (比如通过页面管理、 RabbitMQ 命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。
预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候, 由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失;或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。 当然可以配合 mandatory 参数或者备份交换器 (详细可参考 4.1 节) 来提高程序的健壮性。
与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阔值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。
如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。
至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。