RabbitMq整体架构RabbitMQ笔记 - 图2

RabbitMQ中消息只能存储到队列中,这个与Kafka中间件消息存储方式有所不同,Kafka是将消息存储在Topic的逻辑层,通过分区索引的方式进行存储,队列中存储的就是实际存储消息的位移标识。RabbitMQ 的生产者生产的消息最终被投递到了队列中,消费者可以从队列中获取消息并且进行消费。
Exchange :交换器在生产者产生消息之后并不是直接交给消息队列中,而是将消息发送到Exchange中,由交换器将消息路由到一个或者多个队列中,如果路由没有找到或者返回给生产者,或者会直接将消息丢弃调。这里只是简单地将交换器看作一个应用实体。
RoutingKey:路由键生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则,而这个RoutingKey需要和交换器的类型和绑定键(BindingKey)联合使用才能最终生效。交换器类型和绑定键在一定情况下用来执行消息的流向。
RabbitMQ 中通过绑定将交换器与队列进行关联,在绑定的时候回指定一个绑定键,这样就可以知道正确的消息路由队列如何
RabbitMQ笔记 - 图3
生产者将消息发送给交换器的时候,需要一个RoutingKey,当BindingKey和RoutingKey相互匹配的时候,消息会被路由到对应的队列中,在绑定多个队列到同一个交换器的时候,这些绑定允许使用相同的BindingKey。BindKey并不是在所有的场景中都可以生效,它依赖于对应的交换器类型,例如fanout类型的交换器就会忽视BindingKey,而是将消息路由到所有绑定的交换器队列中。
在有些场景下其实可以将RoutingKey和BindingKey看作是同一个东西
RabbitMQ笔记 - 图4

交换器类型

  RabbitMQ常用的交换器类型有fanout、direct、topic、headers这四种。AMQP协议里还有另外的两种,System和自定义。
fanout
  它会把所有转发到该交换器的消息全部路由到所有与该交换器绑定的队列上,有点像是广播,但是实际上并不是。
适用场景

  • 大量的多用户在线(multi-player online MMO)游戏使用它更新排行榜或者其他的全体事件,王者荣耀和CFM的VIP用户的大喇叭功能
  • CMS属性的门户网站及APP客户端的新闻推送
  • 分布式系统可以广播各种状态与配置更新
  • IM实时群聊也是其引用场景

direct
  direct 类型的交换器路由规则相对比较简单,是一种一一对应的关系进行路由。将BindingKey和RoutingKey完全匹配的队列进行路
topic
  上面提到的direct类型的路由交换器规则是完全匹配BindingKey和RoutingKey,但是这种严格的匹配在很多的情况下并不能满足实际业务需求,topic 类型的交换器在匹配规则上进行了扩展,它与direct类型的交换器相似,也是将消息路由到BindingKey和RoutingKey相匹配的队列中,但是用法有些不同。

RoutingKey 为一个“.” 分隔的字符串例如“com.rabbitmq.client”等
BindingKey和RoutingKey 也是通过 “.”来分隔的字符串
BindingKey 可以存在两种特殊的字符串“” 和“#” ,用于做模糊匹配,其中#匹配一个 匹配所有
例子:
RabbitMQ笔记 - 图5
适用场景:

针对部分区域性数据进行的更新,如地域性的销售网点或者分公司的调整等
由多个工作者完成的后台任务处理,每个都能够负责处理指定的任务
库存价格更新(更新其他的财务数据)
包含分类与标签的新闻更新(例如只针对某一个特定的运动或团队)
不同种类的云服务编制
分布式结构/特定操作系统软件的构建与包装,每个处理者只能处理一个结构或者系统
header
  header 类型的交换器不依赖与路由键的匹配规则来路由消息,而是根据发送消息的内容headers属性进行匹配,在绑定队列和交换器时制定一组键值对,发送消息到交换器的时候RabbitMQ回获取该消息的headers。对比其中的键值对是否匹配队列和交换器绑定的键值对,如果不匹配则不会进行路由,但是这种交换器的性能相对较差、不适用,基本在实际工作中不会用到。
Default exchange
Default exchange是一个没有名称的(空字符串)被broker预先申明的Direct exchange。它所拥有的一个特殊属性就是使它用于简单的应用程序:每个创建的queue会与它自动绑定,使用queue名称作为Routing key。
Direct exchange

Direct exchange根据Routing key完全匹配发送消息。其中两个要素:

1,将routing key的queue与exchange绑定

  1. 如果新queue中的routing key与之前绑定的routing key(bingding key)相等,exchange则将路由至新的queue中

RabbitMQ笔记 - 图6
消息的acknowledged属性,可以隐式的配置自动确认antoAck为true,也可以显式的调用baseack手动确认,这个确认ack仅仅是通知Server可以安全的删除该消息,而不是通知生产者,生产者可以调用ConfirmCallback获取是否已经发送到Exchange,如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传该消息给下一个订阅者,如果没有订阅者就会存储该消息。

1. 持久化存储机制

持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:

Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不 丢失。
Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不 丢失。
消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。

2. AMQP/RabbitMQ的事务机制

没有捕获到异常并不能代表消息就一定投递成功了。
一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销比较大,一般也不推荐使用

3. 发送端确认机制

RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信 道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上面面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。

4. Consumer ACK

如何保证消息被消费者成功消费?

前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制

5. 消费端限流

在电商的秒杀活动中,活动一开始会有大量并发写请求到达服务端,需要对消息进行削峰处理,如何削峰?
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一 定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递,连锁反应那就会很悲剧… 下面我将从多个角度介绍QoS与限流,防止上面的悲剧发生。
RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内 存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已 连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和 管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着 它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。

RabbitMQ如何保证消息不丢失

RabbitMQ笔记 - 图7
数据的丢失问题,可能出现在生产者、MQ、消费者中
生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit。吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说写RabbitMQ 的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息RabbitMQ 接收了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm机制的。
MQ中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候将消息的deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时RabbitMQ 挂了,就会导致内存里的一点点数据丢失

消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的ack机制,简单来说,就是你关闭 RabbitMQ 的自动ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

RabbitMQ事务

5.1 RabbitMQ解决分布式事务原理
采用最终一致性原理。
需要保证以下三要素:
1.确认生产者一定要将数据投递到MQ服务器中(采用MQ消息确认机制)
2.MQ消费者消息能够正确消费消息,采用手动ACK模式(注意重试幂等性问题)
3.如何保证第一个事务先执行,采用补偿机制,在创建一个补单消费者进行监听,如果订单没有创建成功,进行补单。(如果第一个事务中出错,补单消费者会在重新执行一次第一个事务,例如第一个事务是添加订单表,如果失败在补单的时候重新生成订单记录,由于订单号唯一,所以不会重复)

分布式事务问题

每一步都有可能异常或者出现意外,比如在插入到数据库中出现异常、发送到MQ中途异常、MQ接受消息后返回确认时异常、MQ向消费者投递消息异常、消费者消费消息后向MQ进行ACK确认时异常等。每一步的异常都要进行相应的处理,不然就有可能导致脏数据。
插入到数据库中出现异常:当product插入到数据库异常,这一步还没到MQ,直接回滚。本地消息表的作用在于不过度依赖MQ,MQ中途也有可能挂掉。
发送到MQ中途异常:轮训本地消息表对没有发送到MQ的消息进行重发
MQ接受消息后返回确认时异常:和上步一样,重发到MQ,等MQ给确认收到更改本地消息
MQ向消费者投递消息异常:MQ会进行重试向消费者投递消息,只要消费者没有给MQ ACK确认,MQ就会进行重试,重试配置可根据实际业务配置
消费者消费消息后向MQ进行ACK确认时异常:和上步相同,MQ没收到ACK确认还是会重发,MQ重发时,在消费者需要进行幂等设计处理,防止脏数据
对于本地消息表,可以抽离出来做成消息服务系统,毕竟不只是一个服务在使用,抽离出来可以作为共用

RabbitMQ高可用的模式

普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。
镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据

RabbitMq死信队列

什么是死信
在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:
消息被拒绝访问,即 RabbitMQ返回 nack 的信号时
消息的 TTL 过期时
消息队列达到最大长度
消息不能入队时。
上述场景经常产生死信,即消息在这些场景中时,被称为死信。

RabbitMQ 死信队列基本使用

在 RabbitMQ 中,死信队列的标识为 x-dead-letter-exchange ,通过观察死信队列的标识,我们不难发现,其标识最后为 exchange ,即 RabbitMQ 中的交换机,RabbitMQ 中的死信队列就是由死信交换机而得出的。要想使用死信队列,我们需要首先声明一个普通的消息队列,并将死信队列的标识绑定到这个普通的消息队列上, 这个过程需要我们在生产端进行配置

为什么要用MQ

解耦:**在项目启动之初是很难预测未来会遇到什么困难的,消息中间件在处理过程中插入了一个隐含的,基于数据的接口层,两边都实现这个接口,这样就允许独立的修改或者扩展两边的处理过程,只要两边遵守相同的接口约束即可。
冗余(存储)**:在某些情况下处理数据的过程中会失败,消息中间件允许把数据持久化直到它们完全被处理。扩展性:消息中间件解耦了应用的过程,所以提供消息入队和处理的效率是很容易的,只需要增加处理流程就可以了。
削峰:**在访问量剧增的情况下,但是应用仍然需要发挥作用,但是这样的突发流量并不常见。而使用消息中间件采用队列的形式可以减少突发访问压力,不会因为突发的超时负荷要求而崩溃。
可恢复性**:当系统一部分组件失效时,不会影响到整个系统。消息中间件降低了进程间的耦合性,当一个处理消息的进程挂掉后,加入消息中间件的消息仍然可以在系统恢复后重新处理。
顺序保证:**在大多数场景下,处理数据的顺序也很重要,大部分消息中间件支持一定的顺序性。
缓冲:**消息中间件提供一定能力的缓冲数据作用,主要是用以消息的发布和订阅异步的进行通行,其运用了持久化于中间件中,对消息进行缓冲作用。
异步通信:**通过把消息发送给消息中间件,消息中间件并不立即处理。**

持久化

一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true**);即在声明的时候讲durable字段设置为true即可。
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。**为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue,exchange和Message都持久化。

  1. 消息什么时候刷到磁盘?**写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每个25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘。每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作**

    Kafka和RabbitMQ的区别:

    RabbitMQ 有这么个特性,它在官方文档就声明了自己是不保证多线程消费同一个队列的消息,一定保证顺序的。而不保证的原因,是因为多线程时,当一个线程消费消息报错的时候,RabbitMQ 会把消费失败的消息再入队,此时就可能出现乱序的情况。
    用 RabbitMQ,出现了三个问题:
    为了实现发布订阅功能,从而使用的消息复制,会降低性能并耗费更多资源
    多个消费者无法严格保证消息顺序
    大量的订单集中在一个队列,吞吐量受到了限制
    那么 Kafka 怎么样呢?Kafka 正好在这三个问题上,表现的要比 RabbitMQ 要好得多。
    首先,Kafka 的发布订阅并不会复制消息,因为 Kafka 的发布订阅就是消费者直接去获取被 Kafka 保存在日志文件中的消息就好。无论是多少消费者,他们只需要主动去找到消息在文件中的位置即可。
    其次,Kafka 不会出现消费者出错后,把消息重新入队的现象。
    最后,Kafka 可以对订单进行分区,把不同订单分到多个分区中保存,这样,吞吐量能更好
    RocketMQ有队列选择接口,传参实现,而Rabbitmq没有

    RabbitMQ事务

    一、事务使用

    事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
    channel.txSelect()声明启动事务模式**
    channel.txComment()提交事务;
    channel.txRollback()回滚事务;
    RabbitMQ笔记 - 图8
    **
    消费者模式使用事务**
    假设消费者模式中使用了事务,并且在消息确认之后进行了事务回滚,那么RabbitMQ会产生什么样的变化?
    结果分为两种情况:

  2. autoAck=false手动应对的时候是支持事务的,也就是说即使你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决之后,在做决定是确认消息还是重新放回队列,如果你手动确认现在之后,又回滚了事务,那么已事务回滚为主,此条消息会重新放回队列;

  3. autoAck=true如果自定确认为true的情况是不支持事务的,也就是说你即使在收到消息之后在回滚事务也是于事无补的,队列已经把消息移除了;

    二、Confirm发送方确认模式

    Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
    Confirm的三种实现方式:
    方式一:channel.waitForConfirms()普通发送方确认模式;
    方式二:channel.waitForConfirmsOrDie()批量确认模式;
    方式三:channel.addConfirmListener()异步监听发送方确认模式;
    RabbitMQ笔记 - 图9
    综合总体测试情况来看:Confirm批量确定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右。

    方式二:批量Confirm模式

    以上代码可以看出来channel.waitForConfirmsOrDie(),使用同步方式等所有的消息发送之后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。

    方式三:异步Confirm模式

    可以看出,代码是异步执行的,消息确认有可能是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,如果true表示批量执行了deliveryTag这个值以前的所有消息,如果为false的话表示单条确认。

    6.1 RabbitMQ消息确认机制

    rabbitmq的整个发送过程如下
    1. 生产者发送消息到消息服务
    2. 如果消息落地持久化完成,则返回一个标志给生产者。生产者拿到这个确认后,才能放心的说消息终于成功发到消息服务了。否则进入异常处理流程。
    3. 消息服务将消息发送给消费者
    4. 消费者接受并处理消息,如果处理成功则手动确认。当消息服务拿到这个确认后,才放心的说终于消费完成了。否则重发,或者进入异常处理。

    6.2 异常

    我们来看看可能发送异常的四种
    1. 直接无法到达消息服务
    网络断了,抛出异常,业务直接回滚即可。如果出现connection closed错误,直接增加 connection数即可
    2. 消息已经到达服务器,但返回的时候出现异常
    rabbitmq提供了确认ack机制,可以用来确认消息是否有返回。因此我们可以在发送前在db中(内存或关系型数据库)先存一下消息,如果ack异常则进行重发
    3. 消息送达后,消息服务自己挂了
    如果设置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盘上之后再发送的,确保消息已经存在硬盘上,万一消息服务挂了,消息服务恢复是能够再重发消息
    4. 未送达消费者
    消息服务收到消息后,消息会处于”UNACK”的状态,直到客户端确认消息
    5. 确认消息丢失
    消息返回时假设确认消息丢失了,那么消息服务会重发消息。注意,如果你设置了autoAck= false,但又没应答channel.baskAck也没有应答channel.baskNack,那么会导致非常严重的错误:消息队列会被堵塞住,所以,无论如何都必须应答
    6. 消费者业务处理异常
    消息监听接受消息并处理,假设抛异常了,第一阶段事物已经完成,如果要配置回滚则过于麻烦,即使做事务补偿也可能事务补偿失效的情况,所以这里可以做一个重复执行,比如guava的retry,设置一个指数时间来循环执行,如果n次后依然失败,发邮件、短信,用人肉来兜底。