问题

:::tips 当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限,之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题

解决消息堆积有两种思路:

  • 增加更多消费者,提高消费速度,也就是使用work queue工作队列模型
  • 扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的

  • 队列过长的话会占用系统较多内存,RabbitMQ为了释放内存,会将队列消息转储到硬盘,称为page out,如果队列很长,page out操作会消耗较长时间,且page out过程中队列不能处理消息,因此会出现间歇性的暂停状态、并发时出现波浪性的忽高忽低现象
  • 队列过长同时会加长RabbitMQ重启时间,因为启动时候需要重建索引
  • 队列过长还会导致集群之间节点同步消息时间变长

从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列,惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储
  • 基于磁盘存储,消息时效性会降低
  • 性能受限于磁盘的IO
  • 没有间歇性的page out,性能比较稳定 :::

    声明惰性队列

    基于@Bean方式

    :::tips 在消息消费者的配置类中声明惰性队列 ::: ```java @Configuration public class XxxConfig{

    //声明惰性队列 @Bean public Queue lazyQueue(){

    1. return QueueBuilder
    2. .durable("lazy.queue") //设置队列名称并指定为持久化队列
    3. .lazy() //设置惰性队列
    4. .build();

    }

}

  1. <a name="rB0BN"></a>
  2. ### 基于注解方式
  3. :::tips
  4. 在消息消费者的RabbitMqListener类中新增消费者,使用@RabbitListener注解的属性来声明惰性队列
  5. :::
  6. ```java
  7. //将这个类注册到Spring容器中
  8. @Component
  9. public class RabbitMqListener {
  10. //声明惰性队列,同时监听队列中的消息
  11. @RabbitListener(queuesToDeclare = @Queue(
  12. name = "lazy.queue",
  13. arguments = @Argument(name = "x-queue-mode", value = "lazy") //声明惰性队列
  14. ))
  15. public void listen(String msg){
  16. System.out.println("接收到消息:" + msg);
  17. }
  18. }