都知道 Rocketmq 中有 ConsumerGroup 的概念。在集群模式下,多台服务器配置相同的 ConsumerGroup,能够使得每次只有一台服务器消费消息(注意,但不保证只消费一次,存在网络抖动的情况)。那么,笔者就很疑惑,Rocketmq 是如何实现这个模式的?如何保证只有一台服务器消费?
虽然答案很简单,但却是一个很好的带着问题看源码的机会。

RocketMq 结构

RocketMq 中 MessageQueue 的分配 - 图1

从图中可以看到,MQ 主要投递消息和拉取消息两个环节。
众多的架构都是顺应时代潮流而来,Rocketmq 的结构体系当然也不是阿里所独创的,而是依据 AMQP 协议而来。Rocketmq 中的 Producer,Broker,以及 Consumer 都是依据 AMQP 中的概念衍生出来的。所以这里不妨讲讲 AMQP(Advanced Message Queuing Protocal,高级消息队列协议),便于大家更好的理解技术的发展过程。
paper 下载 http://www.amqp.org/specification/0-9-1/amqp-org-download
RocketMq 中 MessageQueue 的分配 - 图2

  • Broker: 接收和分发的应用
  • Virtual host: 出于多租户和安全因素,把 AMQP 的基本组件划分到一个虚拟分组中。各个租户之间是网络隔离的,类似 Linux 中的 namespace 概念(可自行 Google)
  • Connection:publisher/consumer 和 broker 之间的 TCP 连接
  • Channel:是相较于 Connection 更加轻量的连接,是 Connection 上的逻辑连接
  • Exchange: 负责将 message 分发到不同的 Queue 中
  • Queue: 消息最终会落到 Queue 中,消息由 Broker push 给 Consumer 或者由 Consumer 来 pull 消息
  • Binding:exchange 和 queue 之间的消息路由策略

    消息队列的 3 大类型

    当然基于这样一个协议,不单单是 RocketMq 一个闪耀在消息队列选型中,还有不同的消息队列。
    https://mp.weixin.qq.com/s/B1D-J_1wpaqj0sxcmaArbQ
    主要分为了 3 大阵营:

  • 有 Broker 重 Topic 流:kafka,JMS

  • 有 Broker 轻 Topic 流: RocketMQ
  • 无 Broker: ZeroMQ

当然,如果熟悉了 AMQP 协议,你也可以选择自研一个消息队列
https://zhuanlan.zhihu.com/p/28967866
了解了一些背景,来看下 RocketMQ 中消息的投递过程。还是那个具体的问题,RocketMQ 是如何选择一个队列来投递的呢?

Producer 如何投递消息到不同队列

这里提一下,RocketMq 中所有关于生产者和消费者的代码都在 client 包下。打开源码,可以看到 Procuder 下有个 selector 包,看到这个包是不是感到就是它的感觉。
可以看到 selector 下的三个类都是实现了 MessageQueueSelector,来看下 MessageQueueSelector 的代码。
public interface MessageQueueSelector { MessageQueue select(final List mqs, final Message msg, final Object arg); } public class MessageQueue { private String topic; private String brokerName; private int queueId; } 复制代码
看一下哪里调用了 MessageQueueSelector.select (),发现是 DefaultMQProducerImpl,那么可以确认就是由 MessageQueueSelector 提供了选择哪个队列。
RocketMq 提供了 3 种不同的选择队列方式:

  • SelectMessageQueueByHash
  • SelectMessageQueueByMachineRoom
  • SelectMessageQueueByRandom

    默认队列数量

    细心的同学肯定会问那么队列数量是无限大的吗?这个可以查阅 RocketMq 的使用手册,默认的队列数量是 4 (defaultTopicQueueNums: 4),当然你也可以选择自己配置。
    同时不知道有没有同学找错地儿,笔者刚开始是找错地儿了,在 TopicPublishInfo 中也找到了个 selectOneMessageQueue,代码如下。
  1. public class TopicPublishInfo{
  2. // 不同版本,代码有些不同,逻辑类似
  3. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  4. if (lastBrokerName != null) {
  5. int index = this.sendWhichQueue.getAndIncrement();
  6. for (int i = 0; i < this.messageQueueList.size(); i++) {
  7. int pos = Math.abs(index++) % this.messageQueueList.size();
  8. MessageQueue mq = this.messageQueueList.get(pos);
  9. if (!mq.getBrokerName().equals(lastBrokerName)) {
  10. return mq;
  11. }
  12. }
  13. return null;
  14. }
  15. else {
  16. int index = this.sendWhichQueue.getAndIncrement();
  17. int pos = Math.abs(index) % this.messageQueueList.size();
  18. return this.messageQueueList.get(pos);
  19. }
  20. }
  21. }

查了下调用方发现是 MQFaultStrategy,看来是 Rocketmq 消费失败时候,会将消息重新投递到不同的队列,这样在集群模式下能够保证分布到不同机器消费。(是不是还有疑惑,为什么能保证到不同机器,请往下看)

Consumer 如何从消息队列获取消息

这里是比较难理解的一步,首先查阅 RocketMQ 手册可以看到:
RocketMQ 的 Consumer 都是从 Broker 拉消息来消费,但是为了能做到实时收消息,RocketMQ 使用长轮询方式,可以保证消息实时性同 Push 方式一致。返种长轮询方式类似亍 Web QQ 收収消息机制。请参考以下信息了解更多。http://www.ibm.com/developerworks/cn/web/wa-lo-comet/
虽然解释的很详细,但是对新手还是不是很友好。简单的来说,就是使用长轮询,客户端发起请求和服务端先连接上,但是如果服务端没有数据,这是连接还是 hold 住,当有数据 push 给客户端的时候才关闭连接。这样不但保证了消费者不会被上游的消息打垮,也保证了消息的实时性。
那么还有个问题,Consumer 如何从 MessageQueue 上拉取消息呢?是随机拉吗?
不妨来看下 MQPullConsumer,DefaultMQPullConsumer 就是继承于它。

  1. public class MQPullConsumer {
  2. // 拉消息,非阻塞
  3. //
  4. // @param mq from which message queue
  5. // @param subExpression 订阅的tag,只支持"tag1 || tag2 || tag3"
  6. // @param offset 标志位
  7. // @param maxNums 消费最大数量
  8. PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
  9. final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
  10. InterruptedException;
  11. }

可以看到 MessageQueue 是传进来的,这就比较尴尬了,实在无法理解是什么时候决定好从哪个队列拉取消息的。幸亏有万能的搜索引擎,
https://zhuanlan.zhihu.com/p/25140744
RocketMq 有专门的类 AllocateMessageQueueStrategy.class,就藏在 Client.Consumer.rebalance 包下。

  • AllocateMessageQueueAveragely
  • AllocateMessageQueueAveragelyByCircle
  • AllocateMessageQueueByConfig
  • AllocateMessageQueueByMachineRoom
  • AllocateMessageQueueConsistentHash

每一次 Consumer 数量的变更,都会触发 AllocateMessageQueueStrategy。也就是每一次 Consumer 拉取的队列都是固定好的。
现在,在回过头来看看第一张 RocketMQ 的架构图,是不是觉得画的很透彻。

总结

  1. 任何的框架都有它衍生变化的历史,了解架构变化的历史,才能更好的理解一个框架
  2. 好好研读使用手册,包含了很多架构的细节
  3. 带着问题去研读源码


作者:飞奔的蛋蛋
链接:https://juejin.cn/post/6844903566650392590
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。