RabbitMQ

1. 什么是MQ?

  1. MQ全称为Message Queue,也就是消息队列,是应用程序和应用程序之间的通信方法。
  2. MQ能用来干什么?在微服务盛行的当下,MQ被使用的也是越来越多,一般常用来进行**业务异步解耦、解耦微服务、流量削峰填谷、消息分发**。

2. RabbitMQ介绍.

    RabbitMQ是mq消息队列中的一个产品。

MQ的产品:
    activeMQ -- RocketMQ -- RabbitMQ -- Kafka(大数据领域)

RabbitMQ的使用场景:
    MQ适用于一般的项目,数据量不是特别的大,不追求吞吐量,而追求稳定性、安全性、扩展性、高性能,这些正是Rabbitmq的特点。

    Kafka特点 :追求高吞吐量,适用于大数据领域。

amqp规范:
    RabbitMQ完全基于一个规范,amqp规范,

    什么是amqp规范:Advanced Message Queuing Protocol,高级消息队列协议

    而spring针对amqp的整合,只整合了RabbitMQ。

3. MQ的应用场景/三大优势.

MQ三大优势:

        1、解耦
        2、异步处理
        3、流量削峰

1、解耦:

    有了MQ之后,各个组件、服务模块之间,通过向MQ发布和订阅消息来完成通信,大大的降低了系统的耦合度。

2、异步处理.

    比如:a服务调用b服务,b调用c,c调用d,假设都是1秒,则整个服务同步调用下来,至少需要4秒;

    有了MQ之后,如果说a服务调用b服务完成后,主线的业务已经完成可以给用户响应了,则a服务直接发送消息到MQ中,然后去给用户做响应就可以了。

    其他的服务直接在MQ中订阅需要的消息就可以了,这样就以异步的方式完成了处理。

3、流量削峰.

    比如说:由于业务场景的特殊性,在某个时间段,请求的并发量会突然的剧增;

    这时网关模块来接收请求,在不做限流的情况下,假如说网关每秒可以接收1000个请求,而订单模块每秒只能处理100个请求。。。

    如果不用MQ的话,网关一下子分发给订单模块1000个请求,那么订单模块很可能就崩掉了。。。

    而在有了MQ的情况下,网关将消息发布到MQ中,然后让订单模块作为一个消费者服务模块来订阅消息,规定让订单模块每秒只从MQ中订阅100个消息来处理,从而就可以避免了order订单模块发生崩溃。

    这就是流量削峰。

4. RabbitMQ的架构.

RabbitMQ笔记 - 图1

RabbitMQ的基本组件以及执行流程:

如上图所示:

    RabbitMQ的一个进程、一个server服务称为:一个server;

    一个server可以有多个virtual host虚拟机(此虚拟机非Linux中的那种虚拟机);

RabbitMQ的3个主要组件:

1、exchange:
    交换机,生产者的消息发布之后到达MQ的第一站,exchange根据分发规则,匹配规则表中的routing key,然后分发消息到queue队列中去。

2、connection和channel:
    负责将生产者、消费者连接到MQ上;

    细分就是生产者或消费者先通过connection和RabbitMQ建立起连接,然后生产者或消费者再通过channel去发布和订阅消息。

3、queue:消息队列,用于存储还未被消费者消费的消息。

5,RabbitMQ的流程:

1. 生产者通过channel和connection连接到RabbitMQ之后,直接把消息发布到exchange交换机上;

2. 一个exchange交换机绑定着多个queue消息队列,绑定的规则叫做Routing key,绑定的方式有很多种,比如说:通配符绑定,

4.  消费者这边,它只需要订阅某一个queue消息队列即可,queue消息队列里边只要有消息,消费者就可以消费到,消息队列里边没有消息,它就一直等待着消息。

注意:
    需要注意的是:生产者和RabbitMQ通过connections建立连接的时候,就需要指定好要连接到哪一个virtual host虚拟机。

注意2:
    RabbitMQ要想顺序消费,必须一个队列中的数据只能被一个消费者消费到!

    可以有多个消费者去对应这一个队列,但是这一个队列中的消息一次只能被这多个消费者中的一个消费者消费到,其他的消费者这一次是没有消费到这个消息的。

5. RabbitMQ的7种通讯方式/工作模式.

1)简单模式:一个生产者,一个队列,一个消费者;

RabbitMQ笔记 - 图2

第一个,简单模式: 一个生产者,一个队列,一条消息,一个消费者;
     简单的通信,一个生产者发布一个消息,然后这个消息被一个消费者消费,就是简单的你那边发一个、我这边收一个。

2)work模式:一个生产者,一个队列,多个消费者;

RabbitMQ笔记 - 图3

第二个,work模式:一个生产者,一个队列,一条消息,多个消费者;
     一个生产者发布一个消息,exchange交换机再将这个消息放入到一个队列中,多个消费者共同监听这一个队列,队列中有消息时,这多个消费者开始争抢消息,谁先抢到谁消费。没抢到的接着等待。
     还是保证一条消息只能被一个消费者使用。

--- 前两种是简单的方式,你压根就不需要指定交换机,MQ会帮你自动生成  -------------

3)发布与订阅模式:一个生产者,多个队列,多个消费者;

RabbitMQ笔记 - 图4

第三个,发布与订阅模式。一个生产者,多个队列,多条消息,多个消费者;
     生产者把消息发送到exchange交换机,交换机把这个消息发送到所有的队列中,每一个队列中的消息还是对应一个消费者消费到。

4)routing路由模式:一个生产者,多个队列,多个消费者;

RabbitMQ笔记 - 图5

第四个,routing路由模式,一个生产者,多个队列,多条消息,多个消费者;
     生产者将消息发送给exchange交换机,交换机再按照routing路由判断,
     路由是字符串, 生产者提供的消息里面携带有路由字符(即,对象的方法),
     交换机根据路由的key,匹配上对应的队列,
     然后那些队列对应的消费者才能消费到消息;

5)topic通配符模式,一个生产者,多个队列,多个消费者;

RabbitMQ笔记 - 图6

第五个,topic通配符模式,一个生产者,多个队列,多条消息,多个消费者;
      生产者将消息发送给exchange交换机,
      交换机再按照通配符 匹配上对应的队列,
      然后那些队列对应的消费者才能消费到消息;

6)RPC模式:

RabbitMQ笔记 - 图7

    在RabbitMQ中做RPC是很简单的。客户端发送请求消息,服务器回复响应的消息。
流程:    
    1、消费者 向RPC请求队列发送`调用请求`,同时监听RPC响应队列。
    2、生产者 监听RPC请求队列的消息,收到消息后,进行处理 。
    3、生产者将处理结果发送到RPC响应队列;
    4、消费者从RPC响应队列,接收到处理结果。

7)ACK消费者应答模式:

RabbitMQ笔记 - 图8

第七种,ack机制模式(消费者应答模式)
     当消费者收到消息后,再通过回调函数给MQ一个ACK应答,表示已经收到了消息,然后MQ收到ack应答之后,会将这个消息从队列中删除掉,避免该消息被重复消费。

6. Rabbitmq的几种交换机模式

最新版本的RabbitMQ有四种交换机类型,分别是:Direct exchange、Fanout exchange、Topic exchange、Headers exchange。

注意: 消息始终都是先发送到交换机,由交换级经过路由传送给队列,消费者再从队列中获取消息的!

(一) Direct Exchange
 – 处理路由键。(对应的工作模式:routing路由模式)

     需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。
     如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。(一对一的匹配才会转发)

(二) Fanout Exchange
 – 不处理路由键。(对应的工作模式:发布与订阅模式)
    你只需要简单的将队列绑定到交换机上,一个发送到交换机上的消息都会被转发到所有与该交换机绑定的队列上。

(三) Topic Exchange
– 将路由键和某模式进行匹配。(对应的工作模式:topic通配符模式)
    此时队列绑定到交换机需要一个模式的匹配。
    符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
    因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。(匹配才会转发)

(四)Headers Exchanges
 - 不处理路由键。
     队列绑定到交换机,是根据交换机中的消息的内容中的headers属性进行匹配的。
     在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

     headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

7. 如何保证生产者的消息送达MQ?

1)基于AMQP的事务 来保证:

    使用基于amqp的事务.

    amqp规范提供三个方法:
                         开启事务.
                         提交事务.
                         回滚事务.

缺陷很明显:

    AMQP的事务操作,确实可以保证消息的送达.

    但事务操作严重的影响到了RabbitMQ的性能,所以不建议使用。

2)RabbitMQ的Confirm机制 (来保证)

引言:

    事务是amqp协议中已经提供了规范的,RabbitMQ只需要实现即可;

    而confirm是RabbitMQ特有的一个机制,在amqp规范中是没有的;

confirm:确认;

生产者在发布消息前,开启confirm机制;

confirm的三种实现方式:

第一种方式:普通confirm模式,
        生产者publish了一条消息后,等待MQ服务器端来确认是否收到了消息;

        如果MQ服务端返回给生产者一个false或者因为超时未返回,

        生产者会进行消息重传。

第二种方式:批量confirm模式,
    在普通模式的基础上,改为生产者批量发送消息,通过confirm去判断批量处理是否成功,如果有一个失败,直接抛出异常。

    然后生产者会进行消息重传。

上面这两个是同步的,第三种才是异步的。

第三种方式:异步confirm模式,
    异步confirm模式的优点,就是执行效率高,生产者不需要 一直等待着 确认消息是否发出去了,它只需要开启一个监听器即可。

    MQ服务器会给生产者一个确认函数:ack或者nack

    如果生产者收到了ack函数:意味着消息发送成功.

    如果生产者收到了nack函数:意味着消息发送失败.

总体情况:第二种批量Confirm和第三种异步Confirm性能相差不大,Confirm模式要比事务快10倍左右。

8,相关面试题:

1、如何避免生产者给MQ发送重复消息?

    答:    首先呢,无论什么样的MQ都存在这种问题,其实消息重复并不可怕,可怕的是没有考虑到消息重复之后,如何保证幂等性。

    幂等性,通俗点说,对于同一种行为,如果执行不论多少次,最终的结果都是一致相同的,就称这种行为是幂等的。

    所以关键在于怎么保证消息队列消费的幂等性?

其实这需要结合具体的业务来做处理:

业务场景1:
    比如,如果这个操作本来就是幂等性的,比如:删除或者查询这样的操作,那么没有问题,重复消费也无所谓。

业务场景2:
    比如,你是通过MQ,但最终是要将数据给写入Redis中的话,那根本就不用管,Redis具有天然幂等性,每次都是set。

业务场景3:
    比如,你是通过MQ,但最终是要将数据给写入数据库中的话,那就可以设置唯一键,就是用来防止重复数据的插入,如果这数据已经存在了,那就不要插入了,仅仅update 一下、或者直接丢弃掉该消息,就可以了。

业务场景4:
    如果以上场景都不适用,那你可以使出最后的办法,让生产者每次给MQ发送消息的时候,都给消息加上一个全局唯一id(比如UUID)。

    然后MQ在收到了这个消息之后,先把这个 id 给存入到Redis。

    之后MQ每次收到生产者发来的消息时,就先去Redis中查一下,是否已存在这个全局唯一id;

    如果这个id不存在,也意味着不是重复消息,于是MQ就处理这个消息,并把这个消息的 全局唯一id 写入Redis。

    如果这个id在Redis中已经存在了,就说明已经消费过了,那就直接丢弃掉该重复消息,从而避免重复消费。<br />

2、如何保证MQ中的消息不丢失,比如MQ在接收到生产者的消息之后,直接宕机了:

    答:开启队列的消息持久化机制即可.<br />

3、生产者在发送消息时,可能由于网络延迟 导致消息丢失; 或者消息刚刚到达MQ 还没持久化就宕机了,该怎么办?

    答:
  1. amqp包含事务操作,可以启动amqp的事务操作;

  2. 如果对性能有要求的话,那么可以使用RabbitMQ的异步confirm机制;
    MQ中的消息持久化成功,则MQ返回给生产者一个ack函数,不成功则返回一个nack函数;
    如果返回给生产者的是一个nack函数,那么生产者会再次重试去发消息;

9、RabbitMQ如何保证将消息推送给消费者?

    答:**ACK消息确认机制**;

    ACK机制是指:消费者从RabbitMQ订阅到消息、并消费完成后,要给RabbitMQ一个反馈,即ACK消息确认——调用`basicAck()`方法完成,RabbitMQ在收到ACK消息确认以后,才会将此消息从队列中删除掉。

    如果一个消费者由于网络不稳定、服务器异常等现象,没有真正消费掉该消息,那么也就不会给MQ一个ACK消息确认,RabbitMQ没有收到ACK,就明白了该消息并没有被正常的消费掉,于是就会重新将该消息放入队列中,从而推送给其他的消费者。

    这样就可以保证了,万一某个消费者发生了故障,不会因此而丢失掉消息。

10,相关面试题:

1、如果消费者确实已经消费了消息,而由于网络等情况,没有成功的给MQ一个ACK反馈,这时该怎么办?
答:首先呢,只有当消费者正确的发送了ACK反馈,RabbitMQ收到之后,才会将消息给删除掉。否则,消息永远不会从RabbitMQ中删除。

    刚才这种情况造成的问题就是,RabbitMQ没有收到ACK反馈,它就会认为该消息没有被消费掉,于是就会将该消息再次放入到队列中,推送给其他的消费者,这样的话就造成了重复消费。

办法就是:

    可以开启**RabbitMQ的重试机制**,重试次数默认为3次。如果MQ把该消息一直发送给该消费者3次,都没有收到该消费者的ACK反馈,那么RabbitMQ就会将这条消息给删除掉。

2、如果忘记设置让消费者进行ACK,会产生什么问题:
    答:忘记设置让消费者进行ACK,会产生内存泄漏的问题。

    解释:忘记设置让消费者进行ACK反馈,则MQ一直没有收到ACK确认,那么该消息就一直无法从RabbitMQ中给删除掉,也就意味着形成了内存泄漏的问题;

3、消费者在接收到消息后,默认是自动应答MQ,那么如果在消费者执行逻辑操作时,出现异常,怎么办?
答:改为手动ack应答,设置只有在消费者把业务的逻辑全部成功执行之后,才会给MQ一个ack应答。

4、如何保证消费者订阅消息的顺序性.

答:

    保证消息的有序性的最彻底的方式还是:一个队列只对应着一个消费者这种方式;

    即,将一个queue消息队列对应一个consumer消费者,不要一个queue队列对应多个consumer消费者。