连接 RabbitMQ
下面的代码 (代码清单 3-1) 用来在给定的参数 (IP 地址、端口号、用户名、密码等) 下连接 RabbitMQ:
代码清单 3-1
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(virtualHost);
factory.setHost(IP ADDRESS); factory . setPort(PORT);
Connection conn = factory.newConnection();
2
3
4
5
6
也可以选择使用 URI 的方式来实现,示例如代码清单 3-2 所示。
代码清单 3-2
ConnectionFactory factory =new ConnectionFactory();
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");
Connection conn = factory.newConnection();
Connection 接口被用来创建一个 Channel:
Channel channel = conn.createChannel();
2
3
4
5
在创建之后,Channel 可以用来发送或者接收消息了。
注意
Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程问共享, 应用程序应该为每一个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认 (publisher confirm) 机制的运行 (详细可以参考 4.8 节),所以多线程问共享 Channel 实例是非线程安全的 。
Channel 或者 Connection 中有个 isOpen 方法可以用来检测其是否己处于开启状态 (关于 Channel 或者 Connection 的状态可以参考 3.6 节)。但并不推荐在生产环境的代码上使用 isOpen 方法,这个方法的返回值依赖于 shutdownCause (参考下面的代码) 的存在,有可能会产生竞争,代码清单 3-3 是 isOpen 方法的源码:
代码清单 3-3 isOpen 方法的源码
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}
2
3
4
5
错误地使用 isOpen 方法示例代码如代码清单 3 -4 所示。
代码清单 3-4 错误地使用 isOpen 方法
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
....
channel.basicQos(1);
}
}
2
3
4
5
6
7
8
通常情况下,在调用 createXXX 或者 newXXX 方法之后,我们可以简单地认为 Connection 或者 Channel 已经成功地处于开启状态,而并不会在代码中使用 isOpen 这个检测方法。如果在使用 Channel 的时候其己经处于关闭状态,那么程序会抛出一个 com.rabbitmq.client.ShutdownSignalException,我们只需捕获这个异常即可。当然同时也要试着捕获 IOException 或者 SocketException,以防 Connection 意外关闭。
示例代码如代码清单 3-5 所示。
代码清单 3-5
public void validMethod(Channel channel)
{
try {
....
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
...
} catch (IOException ioe) {
...
}
}
2
3
4
5
6
7
8
9
10
11
提示
以下不是该书内容,为本人新增部分。
以上是 java 的实现方式,针对 go,可以用如下方法实现:
package main
import "github.com/streadway/amqp"
func main() {
// 建立 connection
conn, _ := amqp.Dial("amqp://root:root@localhost:5672/")
// 建立 channel
ch, _ := conn.Channel()
// 判断 conn 是否关闭
conn.IsClosed()
}
2
3
4
5
6
7
8
9
10
11
12
13
14
在 go 中可以用 IsClosed () 方法判断 connection 是否关闭,这个方法不会产生竞争,因为方法是原子操作。源码如下:
// IsClosed returns true if the connection is marked as closed, otherwise false
// is returned.
func (c *Connection) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
2
3
4
5