简介

一、什么是消息队列?

消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。

消息队列,一般我们会简称它为MQ(Message Queue),嗯,就是很直白的简写。

我们先不管消息(Message)这个词,来看看队列(Queue)。这一看,队列大家应该都熟悉吧。

队列是一种先进先出的数据结构。

MQ 基础知识 - 图1
先进先出
在Java里边,已经实现了不少的队列了:
MQ 基础知识 - 图2
Java的队列实现类
那为什么还需要消息队列(MQ)这种中间件呢???其实这个问题,跟之前我学Redis的时候很像。Redis是一个以key-value形式存储的内存数据库,明明我们可以使用类似HashMap这种实现类就可以达到类似的效果了,那还为什么要Redis?《Redis合集

  • 到这里,大家可以先猜猜为什么要用消息队列(MQ)这种中间件,下面会继续补充。

消息队列可以简单理解为:把要传输的数据放在队列中
MQ 基础知识 - 图3
图片来源:https://www.cloudamqp.com/blog/2014-12-03-what-is-message-queuing.html
科普:

  • 把数据放到消息队列叫做生产者
  • 从消息队列里边取数据叫做消费者

    二、为什么要用消息队列?

    为什么要用消息队列,也就是在问:用了消息队列有什么好处。我们看看以下的场景

    2.1 解耦

    现在我有一个系统A,系统A可以产生一个userId
    MQ 基础知识 - 图4
    系统A可以产生一个UserId
    然后,现在有系统B和系统C都需要这个userId去做相关的操作
    MQ 基础知识 - 图5
    系统A给系统B和系统C传入userId这个值
    写成伪代码可能是这样的:

    1. public class SystemA {
    2. // 系统B和系统C的依赖
    3. SystemB systemB = new SystemB();
    4. SystemC systemC = new SystemC();
    5. // 系统A独有的数据userId
    6. private String userId = "Java3y";
    7. public void doSomething() {
    8. // 系统B和系统C都需要拿着系统A的userId去操作其他的事
    9. systemB.SystemBNeed2do(userId);
    10. systemC.SystemCNeed2do(userId);
    11. }
    12. }

    结构图如下:
    MQ 基础知识 - 图6
    结构图
    ok,一切平安无事度过了几个天。
    某一天,系统B的负责人告诉系统A的负责人,现在系统B的SystemBNeed2do(String userId)这个接口不再使用了,让系统A别去调它了
    于是,系统A的负责人说”好的,那我就不调用你了。”,于是就把调用系统B接口的代码给删掉了

    1. public void doSomething() {
    2. // 系统A不再调用系统B的接口了
    3. //systemB.SystemBNeed2do(userId);
    4. systemC.SystemCNeed2do(userId);
    5. }

    又过了几天,系统D的负责人接了个需求,也需要用到系统A的userId,于是就跑去跟系统A的负责人说:”老哥,我要用到你的userId,你调一下我的接口吧”
    于是系统A说:”没问题的,这就搞”
    MQ 基础知识 - 图7
    系统A需要调用系统D的接口
    然后,系统A的代码如下:

    1. public class SystemA {
    2. // 已经不再需要系统B的依赖了
    3. // SystemB systemB = new SystemB();
    4. // 系统C和系统D的依赖
    5. SystemC systemC = new SystemC();
    6. SystemD systemD = new SystemD();
    7. // 系统A独有的数据
    8. private String userId = "Java3y";
    9. public void doSomething() {
    10. // 已经不再需要系统B的依赖了
    11. //systemB.SystemBNeed2do(userId);
    12. // 系统C和系统D都需要拿着系统A的userId去操作其他的事
    13. systemC.SystemCNeed2do(userId);
    14. systemD.SystemDNeed2do(userId);
    15. }
    16. }

    时间飞逝:

  • 又过了几天,系统E的负责人过来了,告诉系统A,需要userId。

  • 又过了几天,系统B的负责人过来了,告诉系统A,还是重新掉那个接口吧。
  • 又过了几天,系统F的负责人过来了,告诉系统A,需要userId。
  • ……

于是系统A的负责人,每天都被这给骚扰着,改来改去,改来改去…….
还有另外一个问题,调用系统C的时候,如果系统C挂了,系统A还得想办法处理。如果调用系统D时,由于网络延迟,请求超时了,那系统A是反馈fail还是重试??
最后,系统A的负责人,觉得隔一段时间就改来改去,没意思,于是就跑路了。
然后,公司招来一个大佬,大佬经过几天熟悉,上来就说:将系统A的userId写到消息队列中,这样系统A就不用经常改动了。为什么呢?下面我们来一起看看:
MQ 基础知识 - 图8
系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据
系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据。这样有什么好处

  • 系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心
  • 即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。
  • 系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,只跟消息队列有关

这样一来,系统A与系统B、C、D都解耦了。

2.2 异步

我们再来看看下面这种情况:系统A还是直接调用系统B、C、D
MQ 基础知识 - 图9
直接调接口
代码如下:

  1. public class SystemA {
  2. SystemB systemB = new SystemB();
  3. SystemC systemC = new SystemC();
  4. SystemD systemD = new SystemD();
  5. // 系统A独有的数据
  6. private String userId ;
  7. public void doOrder() {
  8. // 下订单
  9. userId = this.order();
  10. // 如果下单成功,则安排其他系统做一些事
  11. systemB.SystemBNeed2do(userId);
  12. systemC.SystemCNeed2do(userId);
  13. systemD.SystemDNeed2do(userId);
  14. }
  15. }

假设系统A运算出userId具体的值需要50ms,调用系统B的接口需要300ms,调用系统C的接口需要300ms,调用系统D的接口需要300ms。那么这次请求就需要50+300+300+300=950ms
并且我们得知,系统A做的是主要的业务,而系统B、C、D是非主要的业务。比如系统A处理的是订单下单,而系统B是订单下单成功了,那发送一条短信告诉具体的用户此订单已成功,而系统C和系统D也是处理一些小事而已。
那么此时,为了提高用户体验和吞吐量,其实可以异步地调用系统B、C、D的接口。所以,我们可以弄成是这样的:
MQ 基础知识 - 图10
此时才用了100ms
系统A执行完了以后,将userId写到消息队列中,然后就直接返回了(至于其他的操作,则异步处理)。

  • 本来整个请求需要用950ms(同步)
  • 现在将调用其他系统接口异步化,从请求到返回只需要100ms(异步)

(例子可能举得不太好,但我觉得说明到点子上就行了,见谅。)

2.3削峰/限流

我们再来一个场景,现在我们每个月要搞一次大促,大促期间的并发可能会很高的,比如每秒3000个请求。假设我们现在有两台机器处理请求,并且每台机器只能每次处理1000个请求。
MQ 基础知识 - 图11
削峰的场景
那多出来的1000个请求,可能就把我们整个系统给搞崩了…所以,有一种办法,我们可以写到消息队列中:
MQ 基础知识 - 图12
写到消息队列中,系统从消息队列中拿到请求
系统B和系统C根据自己的能够处理的请求数去消息队列中拿数据,这样即便有每秒有8000个请求,那只是把请求放在消息队列中,去拿消息队列的消息由系统自己去控制,这样就不会把整个系统给搞崩。

三、使用消息队列有什么问题?

经过我们上面的场景,我们已经可以发现,消息队列能做的事其实还是蛮多的。
说到这里,我们先回到文章的开头,”明明JDK已经有不少的队列实现了,我们还需要消息队列中间件呢?”其实很简单,JDK实现的队列种类虽然有很多种,但是都是简单的内存队列。为什么我说JDK是简单的内存队列呢?下面我们来看看要实现消息队列(中间件)可能要考虑什么问题

3.1高可用

无论是我们使用消息队列来做解耦、异步还是削峰,消息队列肯定不能是单机的。试着想一下,如果是单机的消息队列,万一这台机器挂了,那我们整个系统几乎就是不可用了。
MQ 基础知识 - 图13
万一单机的队列挂掉了
所以,当我们项目中使用消息队列,都是得集群/分布式的。要做集群/分布式就必然希望该消息队列能够提供现成的支持,而不是自己写代码手动去实现。

3.2 数据丢失问题

我们将数据写到消息队列上,系统B和C还没来得及取消息队列的数据,就挂掉了。如果没有做任何的措施,我们的数据就丢了
MQ 基础知识 - 图14
数据丢失问题
学过Redis的都知道,Redis可以将数据持久化磁盘上,万一Redis挂了,还能从磁盘从将数据恢复过来。同样地,消息队列中的数据也需要存在别的地方,这样才尽可能减少数据的丢失。
那存在哪呢?

  • 磁盘?
  • 数据库?
  • Redis?
  • 分布式文件系统?

同步存储还是异步存储?

3.3消费者怎么得到消息队列的数据?

消费者怎么从消息队列里边得到数据?有两种办法:

  • 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
  • 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)

    3.4其他

    除了这些,我们在使用的时候还得考虑各种的问题:

  • 消息重复消费了怎么办啊?

  • 我想保证消息是绝对有顺序的怎么做?
  • ……..

虽然消息队列给我们带来了那么多的好处,但同时我们发现引入消息队列也会提高系统的复杂性。市面上现在已经有不少消息队列轮子了,每种消息队列都有自己的特点,选取哪种MQ还得好好斟酌

最后

本文主要讲解了什么是消息队列,消息队列可以为我们带来什么好处,以及一个消息队列可能会涉及到哪些问题。希望给大家带来一定的帮助。
参考资料:

  • Kafka简明教程
  • 消息队列使用的四种场景介绍,有图有解析,一看就懂
  • 消息队列设计精要
  • 消息队列的使用场景是怎样的
    • https://www.zhihu.com/question/34243607

      Rabbit MQ

      rabbitmq 的使用场景有哪些?

      ①. 跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。
      ②. 多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。
      ③. 应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。
      ④. 消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。
      ⑤. 应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。
      ⑥. 跨局域网,甚至跨城市的通讯(CDN行业),比如北京机房与广州机房的应用程序的通信。

rabbitmq 有哪些重要的角色?

RabbitMQ 中重要的角色有:生产者、消费者和代理:

  • 生产者:消息的创建者,负责创建和推送数据到消息服务器;
  • 消费者:消息的接收方,用于处理数据和确认消息;
  • 代理:就是 RabbitMQ 本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

rabbitmq 有哪些重要的组件?

  • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用。
  • Channel(信道):消息推送使用的通道。
  • Exchange(交换器):用于接受、分配消息。
  • Queue(队列):用于存储生产者的消息。
  • RoutingKey(路由键):用于把生成者的数据分配到交换器上。
  • BindingKey(绑定键):用于把交换器的消息绑定到队列上。

rabbitmq 中 vhost 的作用是什么?

vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

rabbitmq 的消息是怎么发送的?

首先客户端必须连接到 RabbitMQ 服务器才能发布和消费消息,客户端和 rabbit server 之间会创建一个 tcp 连接,一旦 tcp 打开并通过了认证(认证就是你发送给 rabbit 服务器的用户名和密码),你的客户端和 RabbitMQ 就创建了一条 amqp 信道(channel),信道是创建在“真实” tcp 上的虚拟连接,amqp 命令都是通过信道发送出去的,每个信道都会有一个唯一的 id,不论是发布消息,订阅队列都是通过这个信道完成的。

rabbitmq 怎么保证消息的稳定性?

提供了事务的功能。
通过将 channel 设置为 confirm(确认)模式。

rabbitmq 怎么避免消息丢失?

  1. 消息持久化
  2. ACK确认机制
  3. 设置集群镜像模式
  4. 消息补偿机制

要保证消息持久化成功的条件有哪些?

  1. 声明队列必须设置持久化 durable 设置为 true.
  2. 消息推送投递模式必须设置持久化,deliveryMode 设置为 2(持久)。
  3. 消息已经到达持久化交换器。
  4. 消息已经到达持久化队列。

以上四个条件都满足才能保证消息持久化成功。

rabbitmq 持久化有什么缺点?

持久化的缺地就是降低了服务器的吞吐量,因为使用的是磁盘而非内存存储,从而降低了吞吐量。可尽量使用 ssd 硬盘来缓解吞吐量的问题。

rabbitmq 有几种广播类型?

三种广播模式:

  1. fanout: 所有bind到此exchange的queue都可以接收消息(纯广播,绑定到RabbitMQ的接受者都能收到消息);
  2. direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;
  3. topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息;

    rabbitmq 怎么实现延迟消息队列?

  4. 通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;

  5. 使用 RabbitMQ-delayed-message-exchange 插件实现延迟功能。

rabbitmq 集群有什么用?

集群主要有以下两个用途:
高可用:某个服务器出现问题,整个 RabbitMQ 还可以继续使用;
高容量:集群可以承载更多的消息量。

rabbitmq 节点的类型有哪些?

磁盘节点:消息会存储到磁盘。
内存节点:消息都存储在内存中,重启服务器消息丢失,性能高于磁盘类型。

rabbitmq 集群搭建需要注意哪些问题?

各节点之间使用“—link”连接,此属性不能忽略。
各节点使用的 erlang cookie 值必须相同,此值相当于“秘钥”的功能,用于各节点的认证。
整个集群中必须包含一个磁盘节点。

rabbitmq 每个节点是其他节点的完整拷贝吗?为什么?

不是,原因有以下两个:

  1. 存储空间的考虑:如果每个节点都拥有所有队列的完全拷贝,这样新增节点不但没有新增存储空间,反而增加了更多的冗余数据;
  2. 性能的考虑:如果每条消息都需要完整拷贝到每一个集群节点,那新增节点并没有提升处理消息的能力,最多是保持和单节点相同的性能甚至是更糟。

rabbitmq 集群中唯一一个磁盘节点崩溃了会发生什么情况?

如果唯一磁盘的磁盘节点崩溃了,不能进行以下操作:
不能创建队列
不能创建交换器
不能创建绑定
不能添加用户
不能更改权限
不能添加和删除集群节点
唯一磁盘节点崩溃了,集群是可以保持运行的,但你不能更改任何东西。

rabbitmq 对集群节点停止顺序有要求吗?

RabbitMQ 对集群的停止的顺序是有要求的,应该先关闭内存节点,最后再关闭磁盘节点。如果顺序恰好相反的话,可能会造成消息的丢失。

一:RabbitMQ 中的 broker 是指什么?cluster 又是指什么?

答:broker 是指一个或多个 erlang node 的逻辑分组,且 node 上运行着 RabbitMQ 应用程序。cluster 是在 broker 的基础之上,增加了 node 之间共享元数据的约束。

二:什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?

答:在非 cluster 模式下,元数据主要分为 Queue 元数据(queue 名字和属性等)、Exchange 元数据(exchange 名字、类型和属性等)、Binding 元数据(存放路由关系的查找表)、Vhost 元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。在 cluster 模式下,还包括 cluster 中 node 位置信息和 node 关系信息。元数据按照 erlang node 的类型确定是仅保存于 RAM 中,还是同时保存在 RAM 和 disk 上。元数据在 cluster 中是全 node 分布的。
下图所示为 queue 的元数据在单 node 和 cluster 两种模式下的分布图。
image.png

三:RAM node 和 disk node 的区别?

答:RAM node 仅将 fabric(即 queue、exchange 和 binding等 RabbitMQ基础构件)相关元数据保存到内存中,但 disk node 会在内存和磁盘中均进行存储。RAM node 上唯一会存储到磁盘上的元数据是 cluster 中使用的 disk node 的地址。要求在 RabbitMQ cluster 中至少存在一个 disk node 。

四:RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?

答:可以认为是无限制,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。

五:RabbitMQ 概念里的 channel、exchange 和 queue 这些东东是逻辑概念,还是对应着进程实体?这些东东分别起什么作用?

答:queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给 queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有 AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。但一个操作系统线程上允许使用多个 channel 。channel 号为 0 的 channel 用于处理所有对于当前 connection 全局有效的帧,而 1-65535 号 channel 用于处理和特定 channel 相关的帧。AMQP 协议给出的 channel 复用模型如下
image.png
其中每一个 channel 运行在一个独立的线程上,多线程共享同一个 socket。

六:vhost 是什么?起什么作用?

答:vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。

七:在单 node 系统和多 node 构成的 cluster 系统中声明 queue、exchange ,以及进行 binding 会有什么不同?

答:当你在单 node 上声明 queue 时,只要该 node 上相关元数据进行了变更,你就会得到 Queue.Declare-ok 回应;而在 cluster 上声明 queue ,则要求 cluster 上的全部 node 都要进行元数据成功更新,才会得到 Queue.Declare-ok 回应。另外,若 node 类型为 RAM node 则变更的数据仅保存在内存中,若类型为 disk node 则还要变更保存在磁盘上的数据。

八:客户端连接到 cluster 中的任意 node 上是否都能正常工作?

答:是的。客户端感觉不到有何不同。

九:若 cluster 中拥有某个 queue 的 owner node 失效了,且该 queue 被声明具有 durable 属性,是否能够成功从其他 node 上重新声明该 queue ?

答:不能,在这种情况下,将得到 404 NOT_FOUND 错误。只能等 queue 所属的 node 恢复后才能使用该 queue 。但若该 queue 本身不具有 durable 属性,则可在其他 node 上重新声明。

十:cluster 中 node 的失效会对 consumer 产生什么影响?若是在 cluster 中创建了 mirrored queue ,这时 node 失效会对 consumer 产生什么影响?

答:若是 consumer 所连接的那个 node 失效(无论该 node 是否为 consumer 所订阅 queue 的 owner node),则 consumer 会在发现 TCP 连接断开时,按标准行为执行重连逻辑,并根据“Assume Nothing”原则重建相应的 fabric 即可。若是失效的 node 为 consumer 订阅 queue 的owner node,则 consumer 只能通过 Consumer Cancellation Notification 机制来检测与该 queue 订阅关系的终止,否则会出现傻等却没有任何消息来到的。

十一:能够在地理上分开的不同数据中心使用 RabbitMQ cluster 么?

答:不能。第一,你无法控制所创建的 queue 实际分布在 cluster 里的哪个 node 上(一般使用 HAProxy + cluster 模型时都是这样),这可能会导致各种跨地域访问时的常见问题;第二,Erlang 的 OTP 通信框架对延迟的容忍度有限,这可能会触发各种超时,导致业务疲于处理;第三,在广域网上的连接失效问题将导致经典的“脑裂”问题,而 RabbitMQ 目前无法处理(该问题主要是说 Mnesia)。

十二:为什么 heavy RPC 的使用场景下不建议采用 disk node ?

答:heavy RPC 是指在业务逻辑中高频调用 RabbitMQ 提供的 RPC 机制,导致不断创建、销毁 reply queue ,进而造成 disk node 的性能问题(因为会针对元数据不断写盘)。所以在使用 RPC 机制时需要考虑自身的业务场景。

十三:向不存在的 exchange 发 publish 消息会发生什么?向不存在的 queue 执行 consume 动作会发生什么?

答:都会收到 Channel.Close 信令告之不存在(内含原因 404 NOT_FOUND)。

十四:routing_key 和 binding_key 的最大长度是多少?

答:255 字节。

十五:RabbitMQ 允许发送的 message 最大可达多大?

答:根据 AMQP 协议规定,消息体的大小由 64-bit 的值来指定,所以你就可以知道到底能发多大的数据了。

十六:什么情况下 producer 不主动创建 queue 是安全的?

答:1.message 是允许丢失的;2.实现了针对未处理消息的 republish 功能(例如采用 Publisher Confirm 机制)。

十七:“dead letter”queue 的用途?

答:当消息被 RabbitMQ server 投递到 consumer 后,但 consumer 却通过 Basic.Reject 进行了拒绝时(同时设置 requeue=false),那么该消息会被放入“dead letter”queue 中。该 queue 可用于排查 message 被 reject 或 undeliver 的原因。

十八:为什么说保证 message 被可靠持久化的条件是 queue 和 exchange 具有 durable 属性,同时 message 具有 persistent 属性才行?

答:binding 关系可以表示为 exchange – binding – queue 。从文档中我们知道,若要求投递的 message 能够不丢失,要求 message 本身设置 persistent 属性,要求 exchange 和 queue 都设置 durable 属性。其实这问题可以这么想,若 exchange 或 queue 未设置 durable 属性,则在其 crash 之后就会无法恢复,那么即使 message 设置了 persistent 属性,仍然存在 message 虽然能恢复但却无处容身的问题;同理,若 message 本身未设置 persistent 属性,则 message 的持久化更无从谈起。

十九:什么情况下会出现 blackholed 问题?

答:blackholed 问题是指,向 exchange 投递了 message ,而由于各种原因导致该 message 丢失,但发送者却不知道。可导致 blackholed 的情况:1.向未绑定 queue 的 exchange 发送 message;2.exchange 以 binding_key key_A绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 却是 key_B。

二十:如何防止出现 blackholed 问题?

答:没有特别好的办法,只能在具体实践中通过各种方式保证相关 fabric 的存在。另外,如果在执行 Basic.Publish 时设置 mandatory=true ,则在遇到可能出现 blackholed 情况时,服务器会通过返回 Basic.Return 告之当前 message 无法被正确投递(内含原因 312 NO_ROUTE)。

二十一:Consumer Cancellation Notification 机制用于什么场景?

答:用于保证当镜像 queue 中 master 挂掉时,连接到 slave 上的 consumer 可以收到自身 consume 被取消的通知,进而可以重新执行 consume 动作从新选出的 master 出获得消息。若不采用该机制,连接到 slave 上的 consumer 将不会感知 master 挂掉这个事情,导致后续无法再收到新 master 广播出来的 message 。另外,因为在镜像 queue 模式下,存在将 message 进行 requeue 的可能,所以实现 consumer 的逻辑时需要能够正确处理出现重复 message 的情况。

二十二:Basic.Reject 的用法是什么?

答:该信令可用于 consumer 对收到的 message 进行 reject 。若在该信令中设置 requeue=true,则当 RabbitMQ server 收到该拒绝信令后,会将该 message 重新发送到下一个处于 consume 状态的 consumer 处(理论上仍可能将该消息发送给当前 consumer)。若设置 requeue=false ,则 RabbitMQ server 在收到拒绝信令后,将直接将该 message 从 queue 中移除。
另外一种移除 queue 中 message 的小技巧是,consumer 回复 Basic.Ack 但不对获取到的 message 做任何处理。
而 Basic.Nack 是对 Basic.Reject 的扩展,以支持一次拒绝多条 message 的能力。

二十三:为什么不应该对所有的 message 都使用持久化机制?

答:首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。

二十四:RabbitMQ 中的 cluster、mirrored queue,以及 warrens 机制分别用于解决什么问题?存在哪些问题?

答:cluster 是为了解决当 cluster 中的任意 node 失效后,producer 和 consumer 均可以通过其他 node 继续工作,即提高了可用性;另外可以通过增加 node 数量增加 cluster 的消息吞吐量的目的。cluster 本身不负责 message 的可靠性问题(该问题由 producer 通过各种机制自行解决);cluster 无法解决跨数据中心的问题(即脑裂问题)。另外,在cluster 前使用 HAProxy 可以解决 node 的选择问题,即业务无需知道 cluster 中多个 node 的 ip 地址。可以利用 HAProxy 进行失效 node 的探测,可以作负载均衡。下图为 HAProxy + cluster 的模型。

Mirrored queue 是为了解决使用 cluster 时所创建的 queue 的完整信息仅存在于单一 node 上的问题,从另一个角度增加可用性。若想正确使用该功能,需要保证:1.consumer 需要支持 Consumer Cancellation Notification 机制;2.consumer 必须能够正确处理重复 message 。

Warrens 是为了解决 cluster 中 message 可能被 blackholed 的问题,即不能接受 producer 不停 republish message 但 RabbitMQ server 无回应的情况。Warrens 有两种构成方式,一种模型是两台独立的 RabbitMQ server + HAProxy ,其中两个 server 的状态分别为 active 和 hot-standby 。该模型的特点为:两台 server 之间无任何数据共享和协议交互,两台 server 可以基于不同的 RabbitMQ 版本。如下图所示
image.png
另一种模型为两台共享存储的 RabbitMQ server + keepalived ,其中两个 server 的状态分别为 active 和 cold-standby 。该模型的特点为:两台 server 基于共享存储可以做到完全恢复,要求必须基于完全相同的 RabbitMQ 版本。如下图所示
image.png

Warrens 模型存在的问题:对于第一种模型,虽然理论上讲不会丢失消息,但若在该模型上使用持久化机制,就会出现这样一种情况,即若作为 active 的 server 异常后,持久化在该 server 上的消息将暂时无法被 consume ,因为此时该 queue 将无法在作为 hot-standby 的 server 上被重建,所以,只能等到异常的 active server 恢复后,才能从其上的 queue 中获取相应的 message 进行处理。而对于业务来说,需要具有:a.感知 AMQP 连接断开后重建各种 fabric 的能力;b.感知 active server 恢复的能力;c.切换回 active server 的时机控制,以及切回后,针对 message 先后顺序产生的变化进行处理的能力。对于第二种模型,因为是基于共享存储的模式,所以导致 active server 异常的条件,可能同样会导致 cold-standby server 异常;另外,在该模型下,要求 active 和 cold-standby 的 server 必须具有相同的 node 名和 UID ,否则将产生访问权限问题;最后,由于该模型是冷备方案,故无法保证 cold-standby server 能在你要求的时限内成功启动。


Kafka


一、什么是Kafka?

首先我们得去官网看看是怎么介绍Kafka的:

在收集资料学习的时候,已经发现有不少的前辈对官网的介绍进行翻译和总结了,所以我这里就不重复了,贴下地址大家自行去学习啦:

我之前写过的消息队列入门文章也提到了,要做一个消息队列可能要考虑到以下的问题:

  • 使用消息队列不可能是单机的(必然是分布式or集群)
  • 数据写到消息队列,可能会存在数据丢失问题,数据在消息队列需要持久化(磁盘?数据库?Redis?分布式文件系统?)
  • 想要保证消息(数据)是有序的,怎么做?
  • 为什么在消息队列中重复消费了数据

下面我以Kafka为例对这些问题进行简单的解答,进而入门Kafka。

1.1 Kafka入门

众所周知,Kafka是一个消息队列,把消息放到队列里边的叫生产者,从队列里边消费的叫消费者
MQ 基础知识 - 图19
生产者和消费者
一个消息中间件,队列不单单只有一个,我们往往会有多个队列,而我们生产者和消费者就得知道:把数据丢给哪个队列,从哪个队列消息。我们需要给队列取名字,叫做topic(相当于数据库里边的概念)
MQ 基础知识 - 图20
给队列取名字,专业名词叫topic
现在我们给队列取了名字以后,生产者就知道往哪个队列丢数据了,消费者也知道往哪个队列拿数据了。我们可以有多个生产者往同一个队列(topic)丢数据,多个消费者往同一个队列(topic)拿数据
MQ 基础知识 - 图21
为了提高一个队列(topic)的吞吐量,Kafka会把topic进行分区(Partition)
MQ 基础知识 - 图22
Kafka分区
所以,生产者实际上是往一个topic名为Java3y中的分区(Partition)丢数据,消费者实际上是往一个topic名为Java3y的分区(Partition)取数据
MQ 基础知识 - 图23
生产者和消费者实际上操作的是分区
一台Kafka服务器叫做Broker,Kafka集群就是多台Kafka服务器:
MQ 基础知识 - 图24
Kafka集群
一个topic会分为多个partition,实际上partition会分布在不同的broker中,举个例子:
MQ 基础知识 - 图25
一个生产者丢数据给topic
由此得知:Kafka是天然分布式的

现在我们已经知道了往topic里边丢数据,实际上这些数据会分到不同的partition上,这些partition存在不同的broker上。分布式肯定会带来问题:“万一其中一台broker(Kafka服务器)出现网络抖动或者挂了,怎么办?”
Kafka是这样做的:我们数据存在不同的partition上,那kafka就把这些partition做备份。比如,现在我们有三个partition,分别存在三台broker上。每个partition都会备份,这些备份散落在不同的broker上。
MQ 基础知识 - 图26
红色代表主分区,紫色代表备份分区
红色块的partition代表的是分区,紫色的partition块代表的是备份分区。生产者往topic丢数据,是与分区交互,消费者消费topic的数据,也是与主分区交互。
备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用
另外值得一提的是:当生产者把数据丢进topic时,我们知道是写在partition上的,那partition是怎么将其持久化的呢?(不持久化如果Broker中途挂了,那肯定会丢数据嘛)。

Kafka是将partition的数据写在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。

  • Kafka也不是partition一有数据就立马将数据写到磁盘上,它会先缓存一部分,等到足够多数据量或等待一定的时间再批量写入(flush)。

上面balabala地都是讲生产者把数据丢进topic是怎么样的,下面来讲讲消费者是怎么消费的。既然数据是保存在partition中的,那么消费者实际上也是从partition中取数据。
MQ 基础知识 - 图27
从各个主分区取数据
生产者可以有多个,消费者也可以有多个。像上面图的情况,是一个消费者消费三个分区的数据。多个消费者可以组成一个消费者组
MQ 基础知识 - 图28
消费者组
本来是一个消费者消费三个分区的,现在我们有消费者组,就可以每个消费者去消费一个分区(也是为了提高吞吐量)
MQ 基础知识 - 图29
消费者组的每个消费者会去对应partition拿数据
按图上所示的情况,这里想要说明的是:

  • 如果消费者组中的某个消费者挂了,那么其中一个消费者可能就要消费两个partition了
  • 如果只有三个partition,而消费者组有4个消费者,那么一个消费者会空闲
  • 如果多加入一个消费者组,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据。(消费者组之间从逻辑上它们是独立的)

前面讲解到了生产者往topic里丢数据是存在partition上的,而partition持久化到磁盘是IO顺序访问的,并且是先写缓存,隔一段时间或者数据量足够大的时候才批量写入磁盘的。
消费者在读的时候也很有讲究:正常的读磁盘数据是需要将内核态数据拷贝到用户态的,而Kafka 通过调用sendfile()直接从内核空间(DMA的)到内核空间(Socket的),少做了一步拷贝的操作。
MQ 基础知识 - 图30
Kafka 读数据 巧妙
有的同学可能会产生疑问:消费者是怎么知道自己消费到哪里的呀?Kafka不是支持回溯吗?那是怎么做的呀?

  • 比如上面也提到:如果一个消费者组中的某个消费者挂了,那挂掉的消费者所消费的分区可能就由存活的消费者消费。那存活的消费者是需要知道挂掉的消费者消费到哪了,不然怎么玩。

这里要引出offset了,Kafka就是用offset来表示消费者的消费进度到哪了,每个消费者会都有自己的offset。说白了offset就是表示消费者的消费进度
在以前版本的Kafka,这个offset是由Zookeeper来管理的,后来Kafka开发者认为Zookeeper不合适大量的删改操作,于是把offset在broker以内部topic(__consumer_offsets)的方式来保存起来。
每次消费者消费的时候,都会提交这个offset,Kafka可以让你选择是自动提交还是手动提交。
既然提到了Zookeeper,那就多说一句。Zookeeper虽然在新版的Kafka中没有用作于保存客户端的offset,但是Zookeeper是Kafka一个重要的依赖。

  • 探测broker和consumer的添加或移除。
  • 负责维护所有partition的领导者/从属者关系(主分区和备份分区),如果主分区挂了,需要选举出备份分区作为主分区。
  • 维护topic、partition等元配置信息
  • ….

MQ 基础知识 - 图31
这张图来源胡夕老师的《Kafka核心技术与实战》

最后

通过这篇文章,文章开头那几个问题估计多多少少都懂一些啦。我来简要回答一下:

使用消息队列不可能是单机的(必然是分布式or集群)

Kafka天然是分布式的,往一个topic丢数据,实际上就是往多个broker的partition存储数据

数据写到消息队列,可能会存在数据丢失问题,数据在消息队列需要持久化(磁盘?数据库?Redis?分布式文件系统?)

Kafka会将partition以消息日志的方式(落磁盘)存储起来,通过 顺序访问IO和缓存(等到一定的量或时间)才真正把数据写到磁盘上,来提高速度。

想要保证消息(数据)是有序的,怎么做?

Kafka会将数据写到partition,单个partition的写入是有顺序的。如果要保证全局有序,那只能写入一个partition中。如果要消费也有序,消费者也只能有一个。

为什么在消息队列中重复消费了数据

凡是分布式就无法避免网络抖动/机器宕机等问题的发生,很有可能消费者A读取了数据,还没来得及消费,就挂掉了。Zookeeper发现消费者A挂了,让消费者B去消费原本消费者A的分区,等消费者A重连的时候,发现已经重复消费同一条数据了。(各种各样的情况,消费者超时等等都有可能…)
如果业务上不允许重复消费的问题,最好消费者那端做业务上的校验(如果已经消费过了,就不消费了)
原文地址:https://www.yuque.com/lobotomy/java/oeb35t