Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

    • 什么是消息中间件
    • 消息中间件的作用
    • RabbitMQ 的起源
    • RabbitMQ 的安装及简单使用
      • 安装 Erlang
      • RabbitMQ 的安装
      • RabbitMQ 的运行
      • 生产和消费消息
    • 小结
  • RabbitMQ 入门

  • 客户端开发向导

  • RabbitMQ 进阶

  • RabbitMQ 管理

  • RabbitMQ实战指南
  • RabbitMQ简介
ezreal_rao
2023-03-21
目录

RabbitMQ 的安装及简单使用

这里首先介绍 RabbitMQ 的安装过程,然后演示发送和消费消息的具体实现,以期让读者对 RabbitMQ 有比较直观的感受。

前面提到了 RabbitMQ 是由 Erlang 语言编写的,也正因如此,在安装 RabbitMQ 之前需要安装 Erlang 。建议采用较新版的 Erlang ,这样可以获得较多更新和改进,可以到官网 (http://www.erlang.org/downloads) 下载。截止本书撰稿,最新版本为 20.0,本书示例大多采用 19.x 的版本。

本书如无特指,所有程序都是在 Linux 下运行的,毕竟 RabbitMQ 大多部署在 Linux 操作系 统之中。

注意

本人阅读本书的时候,erlang 已经到了 25.x 版本了,所以我安装的是 25.x 版本。

# 安装 Erlang

下面首先演示 Erlang 的安装。第一步,解压安装包,并配置安装目录,这里我们预备安装 到 /opt/erlang 目录下:

root@ezrealrao:/opt/erlang# tar zxvf otp_src_25.3.tar.gz 
root@ezrealrao:/opt/erlang# cd otp_src_25.3/
root@ezrealrao:/opt/erlang/otp_src_25.3# ./configure --prefix=/opt/erlang
1
2
3

第二步,如果出现类似关键报错信息: No curses library functions found。那么 此时需要安装 ncurses ,安装步骤 (遇到提示输入 y 后直接回车即可) 如下:

[root@hidden otp_src_19.3]# apt-get install ncurses-devel
1

第三步,安装 Erlang:

[root@hidden otp_src_19.3]# make 
[root@hidden otp_src_19.3]# make install
1
2

如果在安装的过程中出现类似 " No ***** found " 的提示,可根据提示信息安装相应的包,之后再执行第二或者第三步,直到提示安装完毕为止。

第四步,修改 /etc/profile 配置文件,添加下面的环境变量:

ERLANG_HOME=/opt/erlang
export PATH=$PATH:$ERLANG_HOME/bin
export ERLANG_HOME
1
2
3

最后执行如下命令让配置文件生效 :

root@ezrealrao:/opt/erlang/otp_src_25.3# source /etc/profile
1

可以输入 erl 命令来验证 Erlang 是否安装成功,如果出现类似以下的提示即表示安装成功:

root@ezrealrao:/opt/erlang/otp_src_25.3# erl
Erlang/OTP 25 [erts-13.2] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1]

Eshell V13.2  (abort with ^G)
1> 
1
2
3
4
5

# RabbitMQ 的安装

RabbitMQ 的安装比 Erlang 的安装要简单,直接将下载的安装包解压到相应的目录下即可, 官网下载地址 : http://www.rabbitmq.comlreleases/rabbitmq-se 凹 er/ 。本书撰稿时的最新版本 3.6.12 ,本书示例大多采用同一系列的 3.6.x 版本。

注意

我学习本书的时候,rabbitmq 版本为 3.11.x,所以我安装的为 3.11.11 版本。下载地址为:https://www.rabbitmq.com/install-generic-unix.html

这里选择将 RabbitMQ 安装到与 Erlang 同一个目录 (/opt) 下面 :

root@ezrealrao:/opt# tar -xvf  rabbitmq-server-generic-unix-3.11.11.tar.xz -C /opt
root@ezrealrao:/opt# cd /opt/
root@ezrealrao:/opt# mv rabbitmq_server-3.11.11 rabbitmq
1
2
3

同样修改 /etc/profile 文件 , 添加下面的环境变量 :

export PATH=$PATH:/opt/rabbitmq/sbin 
export RABBITMQ_HOME=/opt/rabbitmq
1
2

之后执行 source/etc/profile 命令让配置文件生效。

# RabbitMQ 的运行

在修改了 /etc/profile 配置文件之后,可以任意打开一个 Shell 窗口,输入如下命令以 运行 RabbitMQ 服务 :

root@ezrealrao:~# rabbitmq-server -detached
1

在 rabbitmq-s erver 命令后面添加一个 "-detached" 参数是为了能够让 RabbitMQ 服务以守护进程的方式在后台运行,这样就不会因为当前 Shell 窗口的关闭而影响服务。

运行 rabbitmqctl status 命令查看 RabbitMQ 是否正常启动,示例如下:

root@ezrealrao:~# rabbitmqctl status
Status of node rabbit@ezrealrao ...
Runtime

OS PID: 94210
OS: Linux
Uptime (seconds): 2414
Is under maintenance?: false
RabbitMQ version: 3.8.11
Node name: rabbit@ezrealrao
Erlang configuration: Erlang/OTP 23 [erts-11.2.1] [source] [64-bit] [smp:2:2] [ds:2:2:10] [async-threads:1] [hipe]
Erlang processes: 278 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60

Plugins

Enabled plugin file: /opt/rabbitmq/etc/rabbitmq/enabled_plugins
Enabled plugins:


Data directory

Node data directory: /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@ezrealrao
Raft data directory: /opt/rabbitmq/var/lib/rabbitmq/mnesia/rabbit@ezrealrao/quorum/rabbit@ezrealrao

Config files


Log file(s)

 * /opt/rabbitmq/var/log/rabbitmq/rabbit@ezrealrao.log
 * /opt/rabbitmq/var/log/rabbitmq/rabbit@ezrealrao_upgrade.log

Alarms

(none)

Memory

Total memory used: 0.0825 gb
Calculation strategy: rss
Memory high watermark setting: 0.4 of available memory, computed to: 1.4341 gb

code: 0.0292 gb (35.06 %)
other_proc: 0.0263 gb (31.55 %)
other_system: 0.0125 gb (15.05 %)
allocated_unused: 0.0101 gb (12.16 %)
other_ets: 0.0028 gb (3.36 %)
atom: 0.0014 gb (1.64 %)
plugins: 0.0004 gb (0.53 %)
metrics: 0.0002 gb (0.26 %)
binary: 0.0002 gb (0.19 %)
mnesia: 0.0001 gb (0.11 %)
quorum_ets: 0.0 gb (0.06 %)
msg_index: 0.0 gb (0.04 %)
connection_channels: 0.0 gb (0.0 %)
connection_other: 0.0 gb (0.0 %)
connection_readers: 0.0 gb (0.0 %)
connection_writers: 0.0 gb (0.0 %)
mgmt_db: 0.0 gb (0.0 %)
queue_procs: 0.0 gb (0.0 %)
queue_slave_procs: 0.0 gb (0.0 %)
quorum_queue_procs: 0.0 gb (0.0 %)
reserved_unallocated: 0.0 gb (0.0 %)

File Descriptors

Total: 2, limit: 927
Sockets: 0, limit: 832

Free Disk Space

Low free disk space watermark: 0.05 gb
Free disk space: 49.3874 gb

Totals

Connection count: 0
Queue count: 0
Virtual host count: 1

Listeners

Interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

如果 RabbitMQ 正常启动 , 会输出如上所示的信息。当然也可以通过 rabbitmqctl cluster_status 命令来查看集群信息,目前只有一个 RabbitMQ 服务节点,可以看作单节点 的集群 :

root@ezrealrao:~# rabbitmqctl cluster_status
Cluster status of node rabbit@ezrealrao ...
Basics

Cluster name: rabbit@localhost.localdomain

Disk Nodes

rabbit@ezrealrao

Running Nodes

rabbit@ezrealrao

Versions

rabbit@ezrealrao: RabbitMQ 3.8.11 on Erlang 23.3.2

Maintenance status

Node: rabbit@ezrealrao, status: not under maintenance

Alarms

(none)

Network Partitions

(none)

Listeners

Node: rabbit@ezrealrao, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
Node: rabbit@ezrealrao, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
Node: rabbit@ezrealrao, interface: [::], port: 15672, protocol: http, purpose: HTTP API

Feature flags

Flag: drop_unroutable_metric, state: disabled
Flag: empty_basic_get_metric, state: disabled
Flag: implicit_default_bindings, state: enabled
Flag: maintenance_mode_status, state: enabled
Flag: quorum_queue, state: enabled
Flag: user_limits, state: enabled
Flag: virtual_host_metadata, state: enabled
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

提示

具体安装可以查看:源码安装 RabbitMQ (opens new window)

# 生产和消费消息

本节将演示如何使用 RabbitMQ Java 客户端生产和消费消息。本书中如无特殊说明, 示例 都采用 Java 语言来演示, 包括 RabbitMQ 官方文档基本上也是采用 Java 语言来进行演示的。当然如前面所提及的 , RabbitMQ 客户端可以支持很多种语言。

提示

因为我的编程主语言为 GoLang,而且我也不会 Java,所以下面教程我全部替换成 GOLANG 语言的了。。

首先,使用 go get 安装 amqp

go get github.com/rabbitmq/amqp091-go
1

读者可以根据项目的实际情况进行调节。
默认情况下,访 问 RabbitMQ 服务 的用户名和密码都是 "guest" ,这个账户有限制,默认只能通过本地网络 (如 localhost) 访问,远程网络访问受限,所以在实现生产和消费消息之前, 需要另外添加一个用户,并设置相应的访问权限。

添加新用户,用户名为 "root" ,密码为 "root123":

root@ezrealrao:~# rabbitmqctl add_user root root
Adding user "root" ...
Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
1
2
3

为 root 用户设置所有权限:

root@ezrealrao:~# rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
Setting permissions for user "root" in vhost "/" ...
1
2

设置 root 用户为管理员角色:

root@ezrealrao:~# rabbitmqctl set_user_tags root administrator
Setting tags for user "root" to [administrator] ...
1
2

如果读者在使用 RabbitMQ 的过程中遇到类似如下的报错,那么很可能就是账户管理的问题,需要根据上面的步骤进行设置,之后再运行程序。

Exception in thread " main" com.rabbitmq.c1ient.AuthenticationFai1ureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker 1ogfi1e.
1

计算机的世界是从 "Hello World!" 开始的 ,这里我们也沿用惯例, 首先生产者发送一条消息 "Hello World!" 至 RabbitMQ 中, 之后由消费者消费。下面先演示生产者客户端的代码 (代码清单 1-1),接着再演示消费者客户端的代码 (代码清单 1-2)。

代码清单 1-1 生产者客户端代码

send.go (opens new window)

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {

  // 1. 尝试连接RabbitMQ,建立连接
  // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
	conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

  // 2. 接下来,我们创建一个通道,大多数API都是用过该通道操作的。
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

  // 3. 声明消息要发送到的队列
	q, err := ch.QueueDeclare(
		"hello",
		false,
		false,
		false,
		false,
		nil,
	)
	failOnError(err, "Failed to declare a queue")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

  // 4.将消息发布到声明的队列
	body := "Hello World!"
	err = ch.PublishWithContext(
		ctx,
		"",
		q.Name,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

在 send.go 中,我们需要首先导入 amqp 库,然后连接到 RabbitMQ 服务器,连接抽象了 socket 连接,并为我们处理协议版本协商和认证等。接下来,我们创建一个通道,这是大多数用于完成任务的 API 所在的位置,要发送,我们必须声明要发送到的队列。然后我们可以将消息发布到队列。声明队列是幂等的 —— 仅当队列不存在时才创建。消息内容是一个字节数组,因此你可以在消息体处编码任何内容。

代码清单 1-2 消费者客户端代码

receiver.go (opens new window)

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://root:root@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")
	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

该代码(在 receive.go 中)具有与 send 相同的导入和帮助功能,设置与发布者相同;我们打开一个连接和一个通道,并声明要消耗的队列。请注意,这与 send 发布到的队列匹配。我们也在这里声明队列。是因为我们可能在发布者之前启动使用者,所以我们希望在尝试使用队列中的消息之前确保队列存在。我们将告诉服务器将队列中的消息传递给我们。由于它将异步地向我们发送消息,因此我们将在 goroutine 中从通道(由 amqp::Consume 返回)中读取消息。

通过上面的演示,相信各位读者对 RabbitMQ 有了一个初步的认识 。 但是这也仅仅是个开始,路漫漫其修远兮,愿君能上下而求索。

#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
RabbitMQ 的起源
小结

← RabbitMQ 的起源 小结→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式