Ezreal 书架 Ezreal 书架
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
Home
  • 《Go程序员面试笔试宝典》
  • 《RabbitMQ 实战指南》
  • 《深入理解kafka》
  • MySQL45讲
  • 透视HTTP协议
  • 结构化数据的分布式存储系统
  • Raft 共识算法
  • RabbitMQ简介

  • RabbitMQ 入门

  • 客户端开发向导

    • 连接 RabbitMQ
    • 使用交换器和队列
    • 发送消息
    • 消费消息
    • 消费端的确认与拒绝
    • 关闭连接
    • 小结
  • RabbitMQ 进阶

  • RabbitMQ 管理

  • RabbitMQ实战指南
  • 客户端开发向导
ezreal_rao
2023-03-31

连接 RabbitMQ

下面的代码 (代码清单 3-1) 用来在给定的参数 (IP 地址、端口号、用户名、密码等) 下连接 RabbitMQ:

代码清单 3-1

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP ADDRESS); factory . setPort(PORT);
Connection conn = factory.newConnection();
1
2
3
4
5
6

也可以选择使用 URI 的方式来实现,示例如代码清单 3-2 所示。

代码清单 3-2

ConnectionFactory factory =new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection 接口被用来创建一个 Channel: 
Channel channel = conn.createChannel();
1
2
3
4
5

在创建之后,Channel 可以用来发送或者接收消息了。

注意

Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程问共享, 应用程序应该为每一个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认 (publisher confirm) 机制的运行 (详细可以参考 4.8 节),所以多线程问共享 Channel 实例是非线程安全的 。

Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否己处于开启状态 (关于 Channel 或者 Connection 的状态可以参考 3.6 节)。但并不推荐在生产环境的代码上使用 isOpen 方法,这个方法的返回值依赖于 shutdownCause (参考下面的代码) 的存在,有可能会产生竞争,代码清单 3-3 是 isOpen 方法的源码:

代码清单 3-3 isOpen 方法的源码

public boolean isOpen() {
    synchronized(this.monitor) {
        return this.shutdownCause == null;
    }
}
1
2
3
4
5

错误地使用 isOpen 方法示例代码如代码清单 3 -4 所示。

代码清单 3-4 错误地使用 isOpen 方法

public void brokenMethod(Channel channel)
{
  if (channel.isOpen())
  {
    ....
    channel.basicQos(1);
  }
}
1
2
3
4
5
6
7
8

通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其己经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException,我们只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。

示例代码如代码清单 3-5 所示。

代码清单 3-5

public void validMethod(Channel channel)
{
  try {
    ....
    channel.basicQos(1);
  } catch (ShutdownSignalException sse) {
    ...
  } catch (IOException ioe) {
    ...
  }
}
1
2
3
4
5
6
7
8
9
10
11

提示

以下不是该书内容,为本人新增部分。

以上是 java 的实现方式,针对 go,可以用如下方法实现:

package main

import "github.com/streadway/amqp"

func main() {
	// 建立 connection
	conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")

	// 建立 channel
	ch, _ := conn.Channel()

	// 判断 conn 是否关闭
	conn.IsClosed()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

在 go 中可以用 IsClosed () 方法判断 connection 是否关闭,这个方法不会产生竞争,因为方法是原子操作。源码如下:

// IsClosed returns true if the connection is marked as closed, otherwise false
// is returned.
func (c *Connection) IsClosed() bool {
	return (atomic.LoadInt32(&c.closed) == 1)
}
1
2
3
4
5
#mq#rabbitmq#amq
上次更新: 5/9/2023, 10:58:32 AM
小结
使用交换器和队列

← 小结 使用交换器和队列→

最近更新
01
为什么我的MySQL会抖一下
07-15
02
HTTP 性能优化面面观
07-12
03
WebSocket:沙盒里的 TCP
07-12
更多文章>
Theme by Vdoing | Copyright © 2022-2024 Ezreal Rao | CC BY-NC-SA 4.0
豫ICP备2023001810号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式