一、延时队列简介

1.1 延时队列简介

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

1.2 延时队列应用场景

场景一:在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行一场处理。这是就可以使用延时队列将订单信息发送到延时队列。

场景二:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作。这时候就可以将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到智能设备。

Rabbitmq实现延时队列一般而言有两种形式:
第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX
第二种方式:利用rabbitmq中的插件x-delay-message
**

1.3 DLX(死信队列)

死信队列,英文缩写: DLX 。Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

image.png

二、利用TTL,DLX(死信队列)实现延时队列

还有一种插件的方式实现延时队列,自行百度。

2.1 RabbitMQConfig

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import java.util.HashMap;
  5. import java.util.Map;
  6. @Configuration
  7. public class RabbitMQConfig {
  8. //延时队列
  9. @Bean
  10. public Queue queue2() {
  11. Map<String,Object> args = new HashMap<>();
  12. //延时,单位ms
  13. args.put("x-message-ttl",10000);
  14. //延时结束后将消息发给指定死信队列
  15. args.put("x-dead-letter-exchange","fanoutExchange");
  16. //参数3:需设为false,不然运行结束,队列就会被删除
  17. return new Queue("queue2",true,false,false,args);
  18. }
  19. //延时队列交换机
  20. @Bean
  21. public DirectExchange directExchange2() {
  22. return new DirectExchange("directExchange2",true,false);
  23. }
  24. //延时队列绑定路由键
  25. @Bean
  26. public Binding bindingDirect2() {
  27. return BindingBuilder.bind(queue2()).to(directExchange2()).with("two");
  28. }
  29. //死信队列
  30. @Bean
  31. public Queue queue3() {
  32. return new Queue("queue3");
  33. }
  34. //死信队列交换机
  35. @Bean
  36. public FanoutExchange fanoutExchange() {
  37. return new FanoutExchange("fanoutExchange",true,false);
  38. }
  39. //死信队列绑定交换机
  40. @Bean
  41. public Binding bindingFanout() {
  42. return BindingBuilder.bind(queue3()).to(fanoutExchange());
  43. }
  44. }

2.2 测试类

  1. @SpringBootTest
  2. @RunWith(SpringRunner.class)
  3. class ProducerApplicationTests {
  4. //使用RabbitTemplate,这提供了接收/发送等等方法
  5. @Resource
  6. RabbitTemplate rabbitTemplate;
  7. @Test
  8. void test2() {
  9. rabbitTemplate.convertAndSend("directExchange2","two","Hello2");
  10. }
  11. }

2.3 查看结果

消息发过去10S内:
image.png
消息在延时队列中

10S过后:
image.png
消息自动转发给死信队列,所以消费者监听死信队列就能够达到效果

三、消息确认(消息可靠投递)

使用了 RabbitMQ 以后,业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。
例如:

  • 消息生产者 - > rabbitmq服务器(消息发送失败)
  • rabbitmq服务器自身故障导致消息丢失
  • 消息消费者 - > rabbitmq服务(消费消息失败)

开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq 整体效率变低,吞吐量下降严重。

3.1 环境搭建

3.1.1 配置中开启 发送端和 消费端 的消息确认

消费端application.yml:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

生产端application.yml:

spring:
  rabbitmq:
    publisher-confirm-type: simple
    publisher-returns: true
    template:
      mandatory: true

3.1.2 生产端RabbitMQConfig

@Configuration
public class RabbitMQConfig {

    //队列
    @Bean
    public Queue queue1() {
        return new Queue("queue1");
    }

    //Direct交换机,可指定其他类型
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("directExchange",true,false);
    }

    //绑定  将队列和交换机绑定, 并设置路由键:one
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(queue1()).to(directExchange()).with("one");
    }

注意:DirectExchange(“directExchange”,true,false) 第三个参数autoDelete需要设置为false
**

rabbitmq的消息确认分为两部分:发送消息确认 和 消息接收确认。

**

3.2 发送消息确认

发送消息确认:用来确认生产者 Producer 将消息发送到 Broker ,Broker 上的交换机 Exchange 再投递给队列 Queue的过程中,消息是否成功投递。

  1. 消息从 Producer Rabbitmq Broker有一个 ConfirmCallback 确认模式
  2. 消息从 Exchange Queue 投递失败有一个 ReturnCallback 退回模式

我们可以利用这两个Callback来确保消息的100%送达。
image.png

3.2.1 ConfirmCallback确认模式

消息只要被 Rabbitmq Broker 接收到就会触发 ConfirmCallback 回调 。**

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(!ack){
            log.info("消息发送异常!");
        }else {
            log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

3.2.2 ReturnCallback 退回模式

如果消息未能投递到目标 Queue 里将触发回调 ReturnCallback ,一旦向 Queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

3.2.3 测试类

@SpringBootTest
@RunWith(SpringRunner.class)
class ProducerApplicationTests {

    //使用RabbitTemplate,这提供了接收/发送等等方法
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Resource
    private ConfirmCallbackService callbackService;

    @Resource
    private ReturnCallbackService returnCallbackService;

    @Test
    void sendMessage(){

        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);
        /**
         * 消费者确认收到消息后,手动ack回执回调处理
         */
        rabbitTemplate.setConfirmCallback(callbackService);
        /**
         * 消息投递到队列失败回调处理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);
        /**
         * 发送消息
         */
        rabbitTemplate.convertAndSend("directExchange","one","Hello",message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            return message;
        },new CorrelationData(UUID.randomUUID().toString()));
    }
}

3.3 消费端确认消息

3.3.1 消费端未确认消息

如果消费端不修改代码,消息接收后,消息会进入Unacked(未确认)中,并且,每一次重启消费端都能够再次接收该消息。
image.png

3.3.2 修改消费端代码

使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数
Receiver:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Slf4j
@Component
@RabbitListener(queues = "queue1")
public class Receiver {

    @RabbitHandler
    public void process(Message message, Channel channel, String str) throws IOException {
        //获得投递序号
        long tag = message.getMessageProperties().getDeliveryTag();

        try{
            log.info("收到的消息为:" + str);
            //业务代码
            //手动ack
            channel.basicAck(tag,false);
        }catch (Exception e){
            if(message.getMessageProperties().getRedelivered()){
                log.info("消息已重复处理失败,拒绝再次接收...");
                //拒绝消息
                channel.basicReject(tag,false);
            }else {
                log.info("消息即将再次返回队列处理");
                channel.basicNack(tag,false,true);
            }
        }
    }

}

此时重新启动消费端,就会接收消息并确认,然后将消息从队列中删除

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker删除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。
举例: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。
multiple:是否批量确认。
requeue:值为 true 消息将重新入队列。

3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。
requeue:值为 true 消息将重新入队列。

四、消费端削峰限流

4.1 举例

用户端发起下单操作
服务端完成秒杀业务逻辑(库存检查,库存冻结,余额检查,余额冻结,订单生成,余额扣减,库存扣减,生成流水,余额解冻,库存解冻)
用户端下单业务简单,每秒发起了5000个请求,服务端秒杀业务复杂,每秒只能处理1000个请求,很有可能用户端不限速的下单,导致服务端A系统被压垮,引发雪崩。
image.png
为了避免雪崩,常见的优化方案有两种:
1)业务用户端队列缓冲,限速发送
2)业务服务端队列缓冲,限速执行
这里讨论的是服务端限速执行

4.2 RabbitMQ如何实现

rabbitmq提供了一种服务质量保障功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认,不进行消费新的消息。
Consumer限流机制:

  1. 确保ack机制为手动确认(manual)。
  2. listener-container配置属性perfetch=X(X代表消费端每次从MQ中拉取X条消息消费,直到手动消费确定后,才会继续去拉取X条消息)

    消费端yml配置文件

    spring:
    rabbitmq:
     host: 127.0.0.1
     port: 5672
     username: guest
     password: guest
     listener:
       simple:
         acknowledge-mode: manual
         prefetch: 100