消费消息

RabbitMQ 消费模式分两种:

  • Push:推模式采用 Basic.Consume 进行消费
  • Pull:拉模式则使用 Basic.Get 进行消费

    #推模式

    在推模式中,可以通过持续订阅的方式来消费消息,相关消费类如下:

  • com.rabbitmq.client.DefaultConsumer

  • com.rabbitmq.client.Consumer

接收消息,一般实现 Consumer 接口,或则继承 DefaultConsumer 来实现。
推模式的关键代码如下

  1. boolean autoAck = false;
  2. // 设置客户端最多接收未被 ack 的消息个数
  3. channel.basicQos(64);
  4. channel.basicConsume(QUEUE_NAME, autoAck, new DefaultConsumer(channel) {
  5. @Override
  6. public void handleDelivery(String consumerTag,
  7. Envelope envelope,
  8. AMQP.BasicProperties properties,
  9. byte[] body) throws IOException {
  10. final String routingKey = envelope.getRoutingKey();
  11. final String contentType = properties.getContentType();
  12. final long deliveryTag = envelope.getDeliveryTag();
  13. // 处理消息
  14. channel.basicAck(deliveryTag, false);
  15. }
  16. });
  17. Copied!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

  • consumerTag:不同的订阅有不同的标签
  • autoAck:在订阅队列时,设置了 false,在接收处理消息后,显示的 ack 操作。可以防止消息不必要的丢失

basicConsume 的全参数说明如下

  1. String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
  2. Copied!

1

  • queue:订阅的队列名称
  • autoAck:是否自动确认,建议设置成 false
  • consumerTag:消费者标签,用来区分多个消费者
  • noLocal:设置为 true,则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者
  • exclusive:是否排他
  • arguments:消费者的其他参数
  • callback:回调函数。用来处理 RabbitMQ 推送过来的消息。

实现回调函数时,重新 handleDelivery 方法,对客户端消费者来说很方便。更复杂的消费者客户端会重新更多的方法。

  • void handleConsumeOk(String consumerTag);会在其他方法之前调用,返回消费者标签
  • void handleCancelOk(String consumerTag);显示的取消一个消费者订阅时被调用,比如调用 channel.basicCancel(consumerTag),该 basicCancel 方法,触发顺序是: handleConsumeOk、handleDelivery 、handleCancelOk
  • void handleCancel(String consumerTag) throws IOException;隐式的取消消费者订阅时调用。
  • void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);当 Channel 或则 Connection 关闭时调用
  • void handleRecoverOk(String consumerTag);

和生产者一样,建议每个线程拥有自己的 Channel ,不要线程共享。业务是线程不安全的。

#拉模式

拉模式是通过以下方式主动获取消息,消费消息

  1. // 同样还是 channel 类
  2. GetResponse basicGet(String queue, boolean autoAck) throws IOException;
  3. Copied!

1
2

  • queue:队列名称
  • autoAck:如果设置为 false,则需要调用 channel.basicAck 来确认消息被消费

如下面的消费示例:

  1. final GetResponse response = channel.basicGet(QUEUE_NAME, false);
  2. System.out.println(new String(response.getBody()));
  3. channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
  4. Copied!

1
2
3

注意是:basicGet 是一次获取一条消息,而推模式可以通过 channel.basicQos(3); 设置一次让服务器发送多少条消息。所以不要在循环中使用拉模式,这严重影响性能。