RabbitMQ的结构

Rabbit MQ的结构图:
image.png

如何保证消息可靠性?

消息真的从发送端发送到Exchange了吗?

消息发送后,发送端不知道RabbitMQ是否真的收到了消息
若RabbitMQ异常,消息丢失后,订单处理流程停止,业务异常
需要使用RabbitMQ发送端确认机制,确认消息发送到RabbitMQ并被处理

Exchange把消息路由到Queue了吗?

如果消息Binding路由异常,在默认情况下,消息将会直接被丢弃。
消息丢弃后,订单处理流程停止,业务异常
需要使用RabbitMQ消息返回机制,确认消息被正确路由,若没发现目标队列,会通知发送方

消费端处理消息处理的过来吗?

业务高峰期,可能出现发送端与接收端性能不一致,大量消息被同时推送给接收端,造成接收端服务崩溃
需要使用RabbitMQ消费端限流机制,限制消息推送速度,保障接收端服务稳定

消费端处理异常怎么办?

默认情况下,消费端接收消息时,消息会被自动确认(ACK)
消费端消息处理异常时,发送端与消息中间件无法得知消息处理情况
需要使用RabbitMQ消费端确认机制,确认消息被正确处理,没有发生处理异常

消息挤压,队列爆满怎么办?

默认情况下,消息进入队列,会永远存在,直到被消费
大量堆积的消息会给RabbitMQ产生很大的压力
需要使用RabbitMQ消息过期时间,防止消息大量积压

如何转移过期消息?

消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报
需要使用RabbitMQ死信队列,收集过期消息,以供分析

发送端确认机制

消息发送后,若中间件收到消息,会给发送端一个应答,生产者接收应答,用来确认这条消息是否正常发送到中间件。

三种确认机制

单条同步确认

  1. 配置channel,开启确认模式: channel.confirmSelect()
  2. 每发送一条消息,调用channel.waitForConfirms()方法,等待确认

如果返回True,说明发送成功,RabbitMQ已经成功签收,返回False说明发送失败,RabbitMQ处理异常。
单条同步确认代码示例:

  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. connectionFactory.setHost("localhost");
  3. // 建立连接
  4. try (Connection connection = connectionFactory.newConnection();
  5. // 创建通道
  6. Channel channel = connection.createChannel()){
  7. // 开启确认模式
  8. channel.confirmSelect();
  9. String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
  10. // 发送消息
  11. channel.basicPublish("exchange.order.restaurant","key.restaurant",null,messageToSend);
  12. log.info(" message sent");
  13. // 等待确认
  14. if (channel.waitForConfirms()){
  15. // 消息确认成功
  16. log.info("RabbitMQ confirm success");
  17. } else {
  18. // 消息确认失败,实际具体根据业务逻辑,比如修改订单状态为创建失败,或者尝试重发
  19. log.info("RabbitMQ confirm failed");
  20. }
  21. }

多条同步确认

  1. 配置channel,开启确认模式: channel.confirmSelect()
  2. 发送多条消息后,调用channel.waitForConfirms()方法,等待确认

如果返回True,说明前面多条消息一起发送成功,返回False说明前面有消息发送失败,RabbitMQ处理异常。
因为是多条,这里发送失败时是不知道哪几条消息失败的,所以不推荐用多条同步确认。
多条同步确认代码示例:

  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. connectionFactory.setHost("localhost");
  3. // 建立连接
  4. try (Connection connection = connectionFactory.newConnection();
  5. // 创建通道
  6. Channel channel = connection.createChannel()){
  7. // 开启确认模式
  8. channel.confirmSelect();
  9. String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
  10. // 发送10条消息
  11. for (int i = 0;i < 10; i++) {
  12. // 发送消息
  13. channel.basicPublish("exchange.order.restaurant","key.restaurant",null,messageToSend);
  14. }
  15. log.info(" message sent");
  16. // 等待确认
  17. if (channel.waitForConfirms()){
  18. // 消息确认成功
  19. log.info("RabbitMQ confirm success");
  20. } else {
  21. // 消息确认失败,实际具体根据业务逻辑,比如修改订单状态为创建失败,或者尝试重发
  22. log.info("RabbitMQ confirm failed");
  23. }
  24. }

异步确认

  1. 配置channel,开启确认模式: channel.confirmSelect()
  2. 在channel上添加监听: addConfirmListener,发送消息后,会回调此方法,通知是否发送成功
  3. 异步确认有可能是单条,也有可能是多条,取决于MQ

确认是异步返回的,且异步也是有可能一次确认多条消息,所以也不推荐。
异步确认代码示例:

  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. connectionFactory.setHost("localhost");
  3. // 建立连接
  4. try (Connection connection = connectionFactory.newConnection();
  5. // 创建通道
  6. Channel channel = connection.createChannel()){
  7. // 开启确认模式
  8. channel.confirmSelect();
  9. // 创建ConfirmListener
  10. ConfirmListener confirmListener = new ConfirmListener() {
  11. // deliveryTag跟Channel相关
  12. // 在并发情况下,会有多个Channel,所以deliveryTag全局不是唯一的
  13. @0verride
  14. public void handleAck(long deliveryTagboolean multiple) throws IOException {
  15. // ack,消息确认成功,deliveryTag表示这个channel发送的第几条消息
  16. // multiple如果是true,表示多条,表示小于等于deliveryTag的消息都被确认成功
  17. // multiple如果是false,表示单条,只确认等于deliveryTag的这一条消息
  18. log.info("Ack,deliveryTag: {}, mutiple:{}", deliveryTag, multiple);
  19. }
  20. @Override
  21. public void handleNack(long deliveryTag, boolean multiple) throws IOException{
  22. // nack,消息确认失败
  23. log.info("Nack,deliveryTag: {}, mutiple:{}", deliveryTag,multiple);
  24. }
  25. };
  26. // 把confirmListener绑定到channel上
  27. channel.addConfirmListener(confirmListener) ;
  28. String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
  29. // 发送10条消息
  30. for (int i = 0;i < 10; i++) {
  31. // 发送消息
  32. channel.basicPublish("exchange.order.restaurant","key.restaurant",null,messageToSend);
  33. }
  34. log.info(" message sent");
  35. // 线程等待,防止当前线程退出,异步线程没法执行
  36. Thread.sleep(100_000);
  37. }

消息返回机制

消息发送后,中间件会对消息进行路由,若没有发现目标队列,中间件会通知发送方

消息返回的开启方法

在RabbitMQ基础配置中有一个关键配置项:Mandatory
Mandatory若为false,RabbitMQ将直接丢弃无法路由的消息
Mandatory若为true,RabbitMQ才会处理无法路由的消息
代码示例:
如果消息路由成功,则不会调用回调方法,如果失败,消息才会调用回调方法。

  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. connectionFactory.setHost("localhost");
  3. // 建立连接,AutoCloseable
  4. try (Connection connection = connectionFactory.newConnection();
  5. // 创建通道
  6. Channel channel = connection.createChannel()){
  7. // 添加ReturnListener
  8. channel.addReturnListener(new ReturnListener() {
  9. /**
  10. *
  11. * @param replyCode 返回状态码
  12. * @param replyText 返回信息
  13. * @param exchange 交换机
  14. * @param routingKey 路由key
  15. * @param properties 消息的元数据
  16. * @param body 消息内容
  17. * @throws IOException
  18. */
  19. @Override
  20. public void handleReturn(int replyCode, String replyText, String exchange,
  21. String routingKey, AMQP.BasicProperties properties,
  22. byte[] body) throws IOException {
  23. // 处理无法被路由的消息
  24. log.info("Message Return:replyCode:{},replyText: {}," +
  25. "exchange: {},routingKey: {},properties: {}, body: {}",
  26. replyCode, replyText, exchange, routingKey, properties,
  27. new String(body));
  28. //除了打印log,可以加别的业务操作
  29. }
  30. });
  31. String messageToSend = objectMapper.writeValueAsString(orderMessageDTO);
  32. // 发送消息,这里的参数true表示Mandatory为true,开启消息返回
  33. channel.basicPublish("exchange.order.restaurant","key.order",
  34. true,null,messageToSend);
  35. log.info(" message sent");
  36. // 防止channel关闭,channel关闭后无法调用回调方法
  37. Thread.sleep(10000);
  38. }

上面使用的是ReturnListener,还有一种方式是使用ReturnCallback,实际和上一个方法一样,只是将所有的入参封装成一个Return对象,所有的参数都可以在returnMessage中获取。

  1. channel.addReturnListener(new ReturnCallback() {
  2. @Override
  3. public void handle(Return returnMessage) {
  4. returnMessage.getReplyCode();
  5. returnMessage.getReplyText();
  6. returnMessage.getExchange();
  7. returnMessage.getRoutingKey();
  8. returnMessage.getProperties();
  9. returnMessage.getBody();
  10. }
  11. });

消费端确认机制

消息发送到消费端后,消费端需要发送一个ACK,确认消息消费成功还是失败。

消费端ACK类型

  • 自动ACK:消费端收到消息后,会自动签收消息
  • 手动ACK:消费端收到消息后,不会自动签收消息,需要我们在业务代码中显式签收消息

自动ACK是消息发送到消费端之后就会自动签收,而手动ACK是我们在业务代码处理成功后写一行代码显式签收消息,如果消息一直未被签收,当消费端断开之后,消息会重新变为Ready状态进行重新投递,消息不会发生丢失。
所以为了确保消息后续的业务处理没有异常,我们选用手动ACK,手动的ACK也有几种不同的类型。

手动ACK类型

  • 单条手动ACK: multiple=false
  • 多条手动ACK: multiple=true

推荐使用单条ACK,多条的业务上处理比较复杂。

那么我们消费消息时,需要关闭自动ACK,这需要在channel.basicConsume设置参数autoAck为false即可。
然后调用channel.basicAck进行手动Ack,并设置multiple为false表示单条Ack。
单条手动Ack代码示例:

  1. @Async
  2. public void handleMessage() throws IOException,
  3. TimeoutException, InterruptedException {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(" localhost");
  6. try (Connection connection = connectionFactory.newConnection();
  7. Channel channel = connection.createChannel()) {
  8. // 声明交换机
  9. channel.exchangeDeclare("exchange.order.restaurant",
  10. BuiltinExchangeType.DIRECT,
  11. true, false, null);
  12. // 声明队列
  13. channel.queueDeclare("queue.restaurant",
  14. true, false, false, null);
  15. // 交换机和队列进行绑定
  16. channel.queueBind(" queue.restaurant",
  17. "exchange.order.restaurant",
  18. "key.restaurant");
  19. // 消息消费后的回调
  20. DeliverCallback deliverCallback = ((consumerTag, message) -> {
  21. String messageBody = new String(message.getBody());
  22. ObjectMapper objectMapper = new ObjectMapper();
  23. try {
  24. OrderMessageDTO orderMessageDTO = objectMapper.readValue(
  25. messageBody,
  26. OrderMessageDTO.class);
  27. // ...业务逻辑处理,保存到数据库
  28. // 这里的channel要使用basicConsume时的channel,不能new一个channel
  29. // false 表示multiple为false,是单条消息手动ACK
  30. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
  31. } catch (Exception e) {
  32. log.error(e.getMessage(), e);
  33. }
  34. });
  35. // 开始消费消息,并设置autoAck为false
  36. channel.basicConsume(
  37. " queue.restaurant", false,
  38. deliverCallback,
  39. consumerTag -> {
  40. });
  41. while (true) {
  42. Thread.sleep(100_000);
  43. }
  44. }

如果你想要多条Ack,可以这么做,比如每5条进行一次Ack:

  1. if (message.getEnvelope().getDeliveryTag()%5 == 0){
  2. channel.basicAck(message.getEnvelope().getDeliveryTag (), true);
  3. }

重回队列

设置重回队列,消费失败的消息会返回队列重新进行投递。
若设置了重回队列,消息被NACK之后,会返回队列末尾,等待进一步被处理。
一般不建议开启重回队列,因为第一次处理异常的消息,再次处理,基本上也是异常。
开启重回队列代码如下:

  1. // 参数分别是deliveryTag,multiple,requeue,requeue表示是否重回队列
  2. channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true) ;

消费端限流

  • 业务高峰期,有个微服务崩溃了,崩溃期间队列挤压了大量消息,微服务上线后,收到大量并发消息
  • 将同样多的消息推给能力不同的副本,会导致部分副本异常

    Qos

    针对以上问题,RabbitMQ开发了QoS(服务质量保证)功能
    QoS功能保证了在一定数目的消息未被确认前,不消费新的消息
    QoS功能的前提是不使用自动确认
    Qos原理
    QoS原理是当消费端有一定数量的消息未被ACK确认时,RabbitMQ不给消费端推送新的消息
    RabbitMQ使用QoS机制实现了消费端限流

    消费端限流机制参数设置

  • prefetchCount:针对一个消费端最多推送多少未确认消息

  • global: true:针对整个消费端限流 false:针对当前channel
  • prefetchSize : 0(单个消息大小限制,一般为0)
  • prefetchSize与global两项,RabbitMQ暂时未实现

3个参数是AMQP协议定义的,但是后面两项暂未实现,所以只设置prefetchCount参数就可以了。
代码示例:

  1. // 设置最多允许2条消费未被Ack
  2. channel.basicQos(2);
  3. // 开始消费消息
  4. channel.basicConsume("queue.restaurant",false,deliverCallback,consumerTag ->{});

消息过期机制

使用RabbitMQ消息过期时间,防止消息大量积压

  • RabbitMQ的过期时间称为TTL (Time to Live),生存时间
  • RabbitMQ的过期时间分为消息TTL和队列TTL
  • 消息TTL设置了单条消息的过期时间
  • 队列TTL设置了队列中所有消息的过期时间

    如何设置合适的TTL?

  • TTL的设置主要考虑技术架构与业务

  • TTL应该明显长于服务的平均重启时间
  • 建议TTL长于业务高峰期时间

不推荐直接只使用TTL,不然消息过期就会丢失,建议结合死信队列使用,将过期的消息转移到死信队列。

单条消息发送时设置TTL代码示例:

  1. // 设置消息的属性,给消息设置过期时间为15s
  2. AMQP.BasicProperties properties = new AMQP.BasicProperties()
  3. .builder().expiration("15000").build();
  4. // 发送消息
  5. channel.basicPublish("exchange.order.restaurant","key.order",
  6. true, properties, messageToSend);

声明队列时设置TTL代码示例:

  1. Map<String,Object> args = new HashMap<>(1 << 2);
  2. // 设置消息的TTL为15s
  3. args.put("x-message-ttl",15000);
  4. // 声明队列
  5. channel.queueDeclare("queue.restaurant", true, false, false, args) ;

注意:如果声明队列时没有设置ttl,后续再设置ttl会报异常,这时只能先删除这个队列,然后重新进行声明。

死信队列

消息被设置了过期时间,过期后会直接被丢弃
直接被丢弃的消息,无法对系统运行异常发出警报
所以需要将过期的消息进行转移,收集起来,以供分析。

  • 死信队列: 队列被配置了DLX属性(Dead-Letter-Exchange)
  • 当一个消息变成死信(dead message)后,能重新被发布到另一个Exchange,这个Exchange也是一个普通交换机
  • 死信被死信交换机路由后,一般进入一个固定队列

    消息变成死信的情况

  • 消息被拒绝(reject/nack)并且requeue=false

  • 消息过期(TTL到期)
  • 队列达到最大长度(通过x-max-length参数设置)

    死信队列的设置

  1. 设置转发、接收死信的交换机和队列:
  • Exchange: dlx.exchange
  • Queue: dlx.queue
  • RoutingKey: #
  1. 在需要设置死信的队列加入参数:
  • x-dead-letter-exchange = dlx.exchange

设置死信队列代码示例:

  1. Map<String,Object> args = new HashMap<>(1 << 2);
  2. // 设置消息的TTL为15s
  3. args.put("x-message-ttl",15000);
  4. // 设置死信队列,死信将投递到exchange.dlx交换机
  5. args.put ("x-dead-letter-exchange", "exchange.dlx");
  6. channel.queueDeclare("queue.restaurant", true, false, false, args) ;

接收死信的专用队列代码示例:

  1. // 声明接收死信的交换机exchange.dlx
  2. channel.exchangeDeclare(
  3. "exchange.dlx", BuiltinExchangeType.TOPIC, true,
  4. false, null);
  5. // 声明接收死信的队列queue.dlx
  6. channel.queueDeclare(
  7. "queue.dlx", true,
  8. false, false, null);
  9. // 路由绑定为#,确保所有消息都能路由到队列
  10. channel.queueBind("queue.dlx","exchange.dlx","#");

注意queue.dlx不是死信队列,它是接收死信的队列,发送死信的那个队列queue.restaurant才是死信队列。

实际经验和小结

  • 善用RabbitMQ的高级特性
    • 对于RabbitMQ的高级特性,要善加利用
    • 接收端确认、死信队列是非常常用的特性
  • 慎用RabbitMQ的高级特性
    • 不要无限追求高级,用上所有RabbitMQ的高级特性
    • 重回队列、发送端确认是不常用的特性,谨慎使用
  • 善用RabbitMQ管控台
    • 管控台是RabbitMQ调试的利器
    • RabbitMQ高级特性多数都涉及交换机、队列的属性配置,可以在管控台确认配置是否生效
    • RabbitMQ高级特性很多都可以在管控台进行试验
  • 为了确保消息发送,使用了发送端确认机制
  • 为了确保消息正确路由,使用了消息返回机制
  • 为了保证消息正常梳理,使用了消费端确认机制
  • 为了保证消费端稳定,使用消费端限流机制
  • 为了中间件问题,使用过期时间机制
  • 为了处理异常消息,使用死信机制