关闭连接

  1. channel.close(); // 不是必须的
  2. connection.close(); // Connection 关闭会自动关闭 channel
  3. Copied!

1
2

AMQP 协议中的 Connection 和 channel 采用同样的方式管理网络失败、内部错误和显式关闭连接。他们的生命周期如下:

  • Open:开启状态,当前对象可用
  • Closing:正在关闭状态当前对象被显式调用关闭方法 shutdown 。
  • Closed:已经关闭状态当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且自己也关闭了。

无论是程序正常调用关闭方法、客户端异常、网络异常等,Connection 和 channel 最终都会成为 Closed 状态。

#ShutdownListener

与关闭相关的方法有:

  • public void addShutdownListener(ShutdownListener listener);
  • public void removeShutdownListener(ShutdownListener listener);

当 Connection 和 Channel 的状态变为 Closed 时会调用 shutdownListener;如果将一个 ShutdownListener 注册到已关闭的对象上,则会立即调用 shutdownListener

  1. public interface ShutdownListener extends EventListener {
  2. public void shutdownCompleted(ShutdownSignalException cause);
  3. }
  4. Copied!

1
2
3

比如这个示例

  1. channel.addShutdownListener(new ShutdownListener() {
  2. @Override
  3. public void shutdownCompleted(ShutdownSignalException cause) {
  4. System.out.println("关闭原因:" + cause.getReason());
  5. System.out.println(cause.isHardError() ? "connection 异常" : "channel 异常");
  6. }
  7. });
  8. // 等待消费者回调后,关闭资源
  9. TimeUnit.SECONDS.sleep(10);
  10. channel.close();
  11. connection.close();
  12. TimeUnit.SECONDS.sleep(10);
  13. Copied!

1
2
3
4
5
6
7
8
9
10
11
12

10 秒后,调用 close 时,则会打印如下信息

  1. 关闭原因:#method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
  2. channel 异常
  3. Copied!

1
2

同时正在消费提交 ack 操作的线程都会异常;因为 channel 关闭了,再操作,肯定会异常

  1. 13:27:18.655 [pool-1-thread-7] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Consumer cn.mrcode.rabbitmq.RabbitConsumer$1@6533f373 (amq.ctag-AmFm8HfjqayFInEYC3AliA) method handleDelivery for channel AMQChannel(amqp://admin@192.168.4.250:5672/,1) threw an exception for channel AMQChannel(amqp://admin@192.168.4.250:5672/,1)
  2. com.rabbitmq.client.AlreadyClosedException: channel is already closed due to clean channel shutdown; protocol method: #method<channel.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
  3. at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:228)
  4. at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:371)
  5. at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:365)
  6. at com.rabbitmq.client.impl.ChannelN.basicAck(ChannelN.java:1169)
  7. at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:89)
  8. at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:436)
  9. at cn.mrcode.rabbitmq.RabbitConsumer$1.handleDelivery(RabbitConsumer.java:52)