基础概念

什么是MQ?

什么是AMQP?

什么是RabbitMQ?

消息队列的通用架构

image.png

RabbitMQ基础概念

RabbitMQ的基础概念
image.png

RabbitMQ的六种工作模式

1. 简单模式

image.png
生产者产生数据直接发送给消息队列,消息队列再发送给消费者

2. Work queues工作队列模式

image.png

订阅模型

订阅模型需要引入交换机exchange,消息从生产者先发送至exchange交换机,交换机再根据配置将消息发送给对应的channel。下面介绍的三种模式,主要差别就是exchange根据怎么的规则发送消息给channel

3.1订阅模型-fanout(广播模式)

exchange_type = fanout
image.png
image.png

3.2 订阅模型-direct(Routing 路由模式)

exchange_type = direct
通过rounting key绑定队列和交换机

image.png
image.png

3.3订阅模型-topic(Topics 话题模式 )

exchange_type = topic
与direct模式相似,只不过direct模式的routing key是固定的字符串,而订阅模式可以有通配符
image.png
image.png
还有一种hearders模式,通过header的信息匹配,现在用的很少

4. RPC模式

image.png
SringBoot实现RabbitMQ的RPC模式

SpringBoot结合RabbitMQ

RabbitMQ高级特性

TTL

time to live 消息存活时间

过期时间,顾名思义设定一个时间后,消息自动移除。
TTL有两种设置方式:

  1. 为队列设置过期时间
  2. 为消息设置过期时间

为队列设置过期时间

队列可以设置一个过期时间x-message-ttl的属性,单位毫秒

x-message-ttl How long a queue can be unused for before it is automatically deleted (milliseconds). (Sets the “x-expires” argument.) (RabbitMQ官网)

  1. @Bean
  2. public Queue ttlQueue(){
  3. Map<String, Object> map = new HashMap<>();
  4. map.put("x-message-ttl",5000);
  5. return new Queue(RabbitMqConstant.TEST_TTL_QUEUE,true,false,false,map);
  6. }

image.png

为消息设置过期时间

  1. MessagePostProcessor mpp = new MessagePostProcessor() {
  2. @Override
  3. public Message postProcessMessage(Message message) throws AmqpException {
  4. message.getMessageProperties().setExpiration("5000");
  5. message.getMessageProperties().setContentEncoding("UTF-8");
  6. return message;
  7. }
  8. };
  9. rabbitTemplate.convertAndSend(RabbitMqConstant.TEST_FANOUT_EXCHANGE,null,"发送的消息",mpp);

二者消息过期后,都会被移除,有什么区别:

  • 过期队列里的消息过期后,会被移植死信队列;直接设置的过期消息不会;

死信队列

不能正常被消费的消息,也不能随意丢弃,会将他们放入一个特殊的死信队列。
以下情况,消息会被投递至死信队列:

  • 消息被否定确认,使用basicNackbasicReject ,并且此时requeue 属性被设置为false。
  • 消息在队列的存活时间超过设置的TTL时间。
  • 消息队列的消息数量已经超过最大队列长度。

死信交换机、死信队列本质上都是普通的交换机、队列,只是用处不同,下面看下配置过程

  1. //1. 配置死信交换机、死信队列,并将两者绑定
  2. // 定义一个死信交换机
  3. @Bean
  4. public DirectExchange deadDirectExchange(){
  5. return new DirectExchange(RabbitMqConstant.TEST_DEAD_EXCHANGE);
  6. }
  7. // 定义一个死信队列
  8. @Bean
  9. public Queue deadQueue(){
  10. return new Queue(RabbitMqConstant.TEST_DEAD_QUEUE,true,false,false);
  11. }
  12. // 绑定
  13. // 这里用的是direct 交换机,配置了routingKey
  14. @Bean
  15. public Binding deadBinding(){
  16. return BindingBuilder.bind(deadQueue()).to(deadDirectExchange()).with("dead");
  17. }
  18. // 为指定队列配置死信队列。配置后,队列满足条件时(此例中是消息过期)会将消息投递至死信队列
  19. @Bean
  20. public Queue ttlQueue(){
  21. Map<String, Object> map = new HashMap<>();
  22. // 设置队列过期时间
  23. map.put("x-message-ttl",5000);
  24. // 为队列设置死信交换机
  25. map.put(RabbitMqConstant.X_DEAD_LETTER_EXCHANGE,RabbitMqConstant.TEST_DEAD_EXCHANGE);
  26. // 设置routingKey(死信交换机为fanout时,不需要配置)
  27. map.put(RabbitMqConstant.X_DEAD_LETTER_ROUTING_KEY,"dead");
  28. return new Queue(RabbitMqConstant.TEST_TTL_QUEUE,true,false,false,map);
  29. }

延迟队列

有时我们希望队列中的消息延迟一定时间再处理。RabbitMQ本身不支持这个功能,但是可以通过死信队列间接实现。将死信队列作为最终处理的目标队列就好,设置TTL的队列只作为中转。

消息确认

下图是简易的消息发送过程,共有三处需要进行消息确认:

  1. 生产者发送消息值Broker
  2. Broker中交换机收到消息后把消息投递给队列
  3. 队列将消息发给消费者
  1. 在 RabbitMQ 中 有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制;
  2. 事务机制需要每个消息或一组消息发布、提交的通道设置为事务性的,因此会非常耗费性能,降低了 Rabbitmq 的消息吞吐量;
  3. 因此我们在实际生产中通常采用确认机制

2. RabbitMQ基础知识 - 图13

生产者消息确认

提供了ConfirmCallbackReturnCallback两个回调来进行确认。
要使用确认回调,需要在配置里手动设置一下:

  1. spring.rabbitmq.publisher-confirm-type: correlated
  2. spring.rabbitmq.publisher-returns: true
  1. // 为RabbitTemplate 设置回调
  2. @Bean
  3. RabbitTemplate rabbitTemplate() {
  4. RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
  5. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
  6. @Override
  7. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  8. if (ack) {
  9. log.info("消息发送成功:{}" + correlationData.getId());
  10. } else {
  11. log.info("消息发送食材:{}" + correlationData);
  12. }
  13. }
  14. });
  15. // confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。
  16. // 在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式
  17. // 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
  18. rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
  19. //Message message表示的是消息体,int replyCode表示响应码,String replyText表示响应内容,String exchange表示发送消息时指定的交换机,String routingKey表示发送消息时指定的routing key
  20. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  21. log.info(MessageFormat.format("未能投递到目标队列,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode,replyText, exchange, routingKey));
  22. }
  23. });
  24. return rabbitTemplate;
  25. }

ConfirmCallback

  1. 消息只要被 broker 接收到就会执行 confirm()
  2. 如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback
  3. 被 broker 接收到只能表示 消息 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里

    ReturnCallback

  4. confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里,要确认是否投递到目标 queue 里,此时就需要用到 return 退回模式

  5. 这样如果未能投递到目标 queue 里将调用 returnCallback,可以记录下详细到投递数据, 记录下来再处理

消费者消息确认

RabbitMQ需要知道消费者是否正确收到/处理了消息(判断逻辑由业务业务决定),收到确认后在队列中删除消息。这就需要消费者给RabbitMQ发送应答消息。
有两种应答方式:自动应答、手动应答

根据情况确认

acknowledge-mode: auto

自动应答

acknowledge-mode: none
消费者在消费消息的时候,如果设定应答模式为自动,则消费者收到消息后,消息就会立即被 RabbitMQ 从 队列中删除掉。但往往我们的业务还需要执行,是否应答由业务决定,因此实际开发中,更多用手动应答

手动应答

acknowledge-mode: manual

  • 可以在既定的正常情况下进行确认(告诉 RabbitMQ,我已经消费过该消息了,你可以删除该条数据了)
  • 可以在既定的异常情况下不进行确认(RabbitMQ 会继续保留该条数据),这样下一次可以继续消费该条数据。

开启手动应答模式

  1. spring:
  2. rabbitmq:
  3. listener:
  4. direct:
  5. acknowledge-mode: manual

手动应答:


        /**
         * 确认消息
         * multiple参数:true 确认所有消息;false 只确认当前这一条消息
         */
        channel.basicAck(tag,false);

        /**
         * 拒绝消息
         * 第二个参数multiple:
         *                                    true 确认所有消息;
         *                  false 只确认当前这一条消息
         * 第三个参数requeue:
         *                                    true 将消息重新放回队列(且后面还有可能消费此消息)
         *                  false 将消息丢弃,如果配置了死信队列则放入死信队列
         *
         */
        channel.basicNack(tag,false,false);

        /**
         * 重新投递此消息
         * requeue参数:
         *                        false 将消息重新投递给自己
         *            true 消息放回队列,并尽可能投递给其他消费者
         */
        channel.basicRecover(false);

        /**
         * 拒绝消息
         * 第二个参数requeue:true 将消息重新放回队列(且后面还有可能消费此消息)
         *                  false 将消息丢弃,如果配置了死信队列则放入死信队列
         */
        channel.basicReject(tag,false);

关于确认模式 acknowledge-mode 有三种模式

RabbitMQ集群

RabbitMQ实现分布式事务

RabbitMQ 有三种模式:单机模式普通集群模式镜像集群模式

  • 单机模式:就是demo级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式

  • 普通集群模式:意思就是在多台机器上启动多个RabbitMQ实例,每个机器启动一个。

  • 镜像集群模式:这种模式,才是所谓的RabbitMQ的高可用模式,跟普通集群模式不一样的是,你创建的queue,无论元数据(元数据指RabbitMQ的配置数据)还是queue里的消息都会存在于多个实例上,然后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。