Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

    • 相关概念介绍
    • AMQP 协议介绍
      • AMQP 生产者流转过程
      • AMQP 消费者流转过程
      • AMQP 命令概览
    • 小结
  • 客户端开发向导

  • RabbitMQ 进阶

  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ 入门
ezreal_rao
2023-03-30
目录

AMQP 协议介绍

从前面的内容可以了解到 RabbitMQ 是遵从 AMQP 协议的, 换句话说 , RabbitMQ 就是 AMQP 协议的 Erlang 的实现 (当然 RabbitMQ 还支持 STOMP2 、 MQTT3 等协议 )。AMQP 的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的 RoutingKey 与绑定时的 BindingKey 相匹配时,消息即被存入相应的队列之中。消费者可以订阅相应的队列来获取消息。

2: STOMP. 即 Simple (or Streaming) Text Oriented Messaging Protocol. 简单(流)文本面向消息协议,它提供了一个可互操作的连接格式,运行 STOMP 客户端与任意 STOMP 消息代理(Broker)进行交互。STOMP 协议由于设计简单,易于开发客户端,因此在多种语言和平台上得到广泛的应用。3: MQTT. 即 Message Queuing Telemetry Transport. 消息队列遥测传输,是 IBM 开发的一个即时通信协议,有可能成为物联网的重要组成部分.该协议支持所有平台,几乎可以把所有物联网和外部连接起来,被用来当作传感器和制动器的通信协议。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相应的概念。目前 RabbitMQ 最新版本默认支持的是 AMQP 0-9-1 。本书中如无特殊说明,都以 AQMP 0-9-1 为基准进行介绍。

AMQP 协议本身包括三层。

  • Module Layer: 位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用 Queue.Declare 命令声明一个队列或者使用 Basic.Consume 订阅消费一个队列中的消息。
  • Session Layer: 位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
  • Transport Layer: 位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

AMQP 说到底还是一个通信协议,通信协议都会涉及报文交互,从 low-level 举例来说, AMQP 本身是应用层的协议,其填充于 TCP 协议层的数据部分。而从 high-level 来说, AMQP 是通过协议命令进行交互的。 AMQP 协议可以看作一系列结构化命令的集合,这里的命令代表一种操作,类似于 HTTP 中的方法 (GET、 POST、 PUT、 DELETE 等) 。

# AMQP 生产者流转过程

为了形象地说明 AMQP 协议命令的流转过程,这里截取代码清单 1-1 中的关键代码,如代 码清单 2-2 所示。

代码清单2-2 简洁版生产者代码

Connection connection = factory.newConnection() ; //创建连接 Channel channel = connection.createChannel() ; //创建信道
String message = " Hello World! ";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY ,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
//关闭资源
channel.close();
connection.close();
1
2
3
4
5
6

当客户端与 Broker 建立连接的时候,会调用 factory.newConnection 方法,这个方法会进一步封装成 Protocol Header 0-9-1 的报文头发送给 Broker,以此通知 Broker 本次交互采用的是 AMQPO-9-1 协议,紧接着 Broker 返回 Connection.Start 来建立连接,在连接的过程中涉及 Connection.Start/.Start-OK、 Connection.Tune/.Tune-Ok, Connection.Open/.Open-Ok 这 6 个命令的交互。

当客户端调用 connection.createChannel 方法准备开启信道的时候,其包装 Channel.Open 命令发送给 Broker,等待 Channel.Open-Ok 命令 。

当客户端发送消息的时候,需要调用 channel.basicPublish 方法,对应的 AQMP 命令为 Basic.Publish,注意这个命令和前面涉及的命令略有不同,这个命令还包含了 Content Header 和 Content Body。Content Header 里面包含的是消息体的属性,例如,投递模式 (可以参 考 3.3 节)、优先级等,而 Content Body 包含消息体本身。

当客户端发送完消息需要关闭资源时,涉及 Channel.Close/.Close-Ok 与 Connection.Close/.Close-Ok 的命令交互。详细流转过程如图 2-10 所示。

流转过程

# AMQP 消费者流转过程

本节我们继续来看消费者的流转过程, 参考代码清单 1-2, 截取消费端的关键代码如代码清单 2-3 所示。

代码清单 1-2

Connection connection = factory.newConnection(addresses);// 创建连接

final Channel channel = connection . createChannel() ; //创建信道 
Consumer consumer = new DefaultConsumer(channel) {}//_省略实现 
channel.basicQos(64); 
channel.basicConsume(QUEUE NAME , consumer);

//等待回调函数执行完毕之后,关闭资源
TimeUnit.SECONDS.sleep(5); 
channel.close();
connection.close();
1
2
3
4
5
6
7
8
9
10
11

其详细流转过程如图 2-11 所示。

流转过程

消费者客户端同样需要与 Broker 建立连接,与生产者客户端一样,协议交互同样涉及 Connection.Start/.Start-Ok、 Connection.Tune/.Tune-Ok 和 Connection.Open/.Open-Ok 等,图 2-11 中省略了这些步骤,可以参考图 2-10 。

紧接着也少不了在 Connection 之上建立 Channel,和生产者客户端一样,协议涉及 Channel.Open/Open-Ok。

如果在消费之前调用了 channel.basicQos (int prefetchCount) 的方法来设置消费者客户端最大能 "保持" 的未确认的消息数,那么协议流转会涉及 Basic.Qos/.Qos-Ok 这两个 AMQP 命令。

在真正消费之前,消费者客户端需要向 Broker 发送 Basic.Consume 命令 (即调用 channel.basicConsume 方法) 将 Channel 置为接收模式,之后 Broker 回执 Basic.Consume-Ok 以告诉消费者客户端准备好消费消息。紧接着 Broker 向消费者客户端推送 (Push) 消息,即 Basic.Deliver 命令,有意思的是这个和 Basic.Publish 命令一样会携带 Content Header 和 Content Body。

消费者接收到消息并正确消费之后,向 Broker 发送确认,即 Basic.Ack 命令。

在消费者停止消费的时候,主动关闭连接,这点和生产者一样,涉及 Channel.Close/.Close-Ok 和 Connection.Close/.Close-Ok 。

# AMQP 命令概览

AMQP 0-9-1 协议中的命令远远不止上面所涉及的这些,为了让读者在遇到其他命令的时候 能够迅速查阅相关信息,下面列举了 AMQP 0-9-1 协议主要的命令,包含名称、是否包含内容体 (Content Body)、对应客户端中相应的方法及简要描述等四个维度进行说明,具体如表 2-1 所示。

amqp命令
amqp命令
amqp命令

#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
相关概念介绍
小结

← 相关概念介绍 小结→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式