发送消息
如果要发送一个消息,可以使用 Channel 类的 basicPublish 方法,比如发送一条内容 为 "Hello World!" 的消息,参考如下:
byte[] messageBodyBytes = "Hello , world! ".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
1
2
2
为了更好地控制发送,可以使用 mandatory 这个参数, 或者可以发送一些特定属性的信息:
channe1.basicPub1ish(exchangeName , routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes) ;
1
针对 go,代码如下:
err = ch.PublishWithContext(context.Background(), "", q.Name, true, false, amqp091.Publishing{
ContentType: "text/plain",
Body: []byte("hello world"),
})
if err != nil {
log.Fatal(err)
}
1
2
3
4
5
6
7
2
3
4
5
6
7
下面这行代码发送了一条消息,这条消息的投递模式 (delivery mode) 设直为 2 ,即消息会被持久化 (即存入磁盘) 在服务器中。同时这条消息的优先级 (priority) 设置为 1 , 为 "text/plain" 。 可以自己设定消息的属性:
channe1.basicPub1ish(exchangeName, routingKey, new AMQP.BasicPropertieS.Builder().contentType("text/p1ain").deliveryMode(2).priority(1).userld("hidden").build()), messageBodyBytes);
1
也可以发送一条带有 headers 的消息 :
Map<String , Object> headers = new HashMap<String, Object>();
headers.put("loca1tion", "here");
headers.put("time", "today");
channe1.basicPublish(exchangeName, routingKey, new AMQP.BasicProperties.Builder().headers(headers).build()), messageBodyBytes);
1
2
3
4
2
3
4
还可以发送一条带有过期时间 (expiration) 的消息 :
channe1.basicPub1ish(exchangeName, routingKey, new AMQP.BasicProperties.Bui1der().expiration("60000").build()), messageBodyBytes) ;
1
以上只是举例,由于篇幅关系,这里就不一一列举所有的可能情形了。对于 basicPublish 而言,有几个重载方法:
void basicPublish(String exchange , String routingKey , BasicProperties props , byte[] body) throws IOException;
void basicPub1ish(String exchange, String routingKey, boo1ean mandatory, BasicProperties props , byte[] body) throws IOException;
void basicPublish(String exchange ,String routingKey , boolean mandatory ,boolean immediate , BasicProperties props , byte[] body) throws IOException;
对应的具体参数解释如下所述。
- exchange: 交换器的名称,指明消息需要发送到哪个交换器中。 如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
- routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中。
- props: 消息的基本属性集,其包含 14 个属性成员,分别有 contentType、 contentEncoding、 headers (Map<String , Object>) 、 deliveryMode 、 priority、 correlationId、 replyTo 、 expiration 、 messageld、 timestamp、 type 、 userId、 appId、 clusterId。其中常用的几种都在上面的示例中进行了演示。
- byte [] body: 消息体 ( pay1oad) ,真正需要发送的消息。
- mandatory 和 immediate 的详细内容请参考 4.1 节 。
props 各属性含义如下:
属性名 | 说明 |
---|---|
content-type | 消息体的 MIME 类型,如 application/json |
content-encoding | 消息的编码类型,如是否压缩 |
message-id | 消息的唯一性标识,由应用进行设置 |
correlation-id | 一般用做关联消息的 message-id,常用于消息的响应 |
timestamp | 消息的创建时刻,整型,精确到秒 |
expiration | 消息的过期时刻, 字符串,但是呈现格式为整型,精确到秒 |
delivery-mode | 消息的持久化类型,1 为非持久化,2 为持久化,性能影响巨大 |
app-id | 应用程序的类型和版本号 |
user-id | 标识已登录用户,极少使用 |
type | 消息类型名称,完全由应用决定如何使用该字段 |
reply-to | 构建回复消息的私有响应队列 |
headers | 键 / 值对表,用户自定义任意的键和值 |
priority | 指定队列中消息的优先级 |
上次更新: 5/9/2023, 10:58:32 AM