该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 消费者启动流程

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#start

1、检查配置信息

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#checkConfig

校验消费组的长度不能大于 255

public static final int CHARACTER_MAX_LENGTH = 255;

  1. if (group.length() >CHARACTER_MAX_LENGTH) {
  2. throw new MQClientException("the specified group is longer than group max length 255.", null);
  3. }

消费组名称只能包含数字、字母、%、-、_、|

  1. // regex: ^[%|a-zA-Z0-9_-]+$
  2. // %
  3. VALID_CHAR_BIT_MAP['%'] = true;
  4. // -
  5. VALID_CHAR_BIT_MAP['-'] = true;
  6. // _
  7. VALID_CHAR_BIT_MAP['_'] = true;
  8. // |
  9. VALID_CHAR_BIT_MAP['|'] = true;
  10. for (int i = 0; i <VALID_CHAR_BIT_MAP.length; i++) {
  11. if (i >= '0' && i <= '9') {
  12. // 0-9
  13. VALID_CHAR_BIT_MAP[i] = true;
  14. } else if (i >= 'A' && i <= 'Z') {
  15. // A-Z
  16. VALID_CHAR_BIT_MAP[i] = true;
  17. } else if (i >= 'a' && i <= 'z') {
  18. // a-z
  19. VALID_CHAR_BIT_MAP[i] = true;
  20. }
  21. }
  1. public static boolean isTopicOrGroupIllegal(String str) {
  2. int strLen = str.length();
  3. int len =VALID_CHAR_BIT_MAP.length;
  4. boolean[] bitMap =VALID_CHAR_BIT_MAP;
  5. for (int i = 0; i < strLen; i++) {
  6. char ch = str.charAt(i);
  7. if (ch >= len || !bitMap[ch]) {
  8. return true;
  9. }
  10. }
  11. return false;
  12. }

消费组名称不能是DEFAULT_CONSUMER

public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";

  1. if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
  2. throw new MQClientException("consumerGroup can not equal " + MixAll.DEFAULT_CONSUMER_GROUP
  3. + ", please specify another one." + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
  4. }

消费者最小线程数需要在 1-1000 之间

  1. if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
  2. || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
  3. throw new MQClientException("consumeThreadMin Out of range [1, 1000]"
  4. + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
  5. }

消费者最大线程数需要在 1-1000 之间

  1. if (this.defaultMQPushConsumer.getConsumeThreadMax() < 1 || this.defaultMQPushConsumer.getConsumeThreadMax() > 1000) {
  2. throw new MQClientException("consumeThreadMax Out of range [1, 1000]"
  3. + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL), null);
  4. }

2、设置订阅信息

构造主题订阅消息SubscriptionData并将其加入RebalanceImpl,如果是消费模式是集群,订阅默认的重试主题并且构造SubscriptionData加入RebalanceImpl

  1. private void copySubscription() throws MQClientException {
  2. try {
  3. Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
  4. if (sub != null) {
  5. for (final Map.Entry<String, String> entry : sub.entrySet()) {
  6. final String topic = entry.getKey();
  7. final String subString = entry.getValue();
  8. SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
  9. this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
  10. }
  11. }
  12. if (null == this.messageListenerInner) {
  13. this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
  14. }
  15. switch (this.defaultMQPushConsumer.getMessageModel()) {
  16. caseBROADCASTING:
  17. break;
  18. caseCLUSTERING:
  19. final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
  20. SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
  21. this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
  22. break;
  23. default:
  24. break;
  25. }
  26. } catch (Exception e) {
  27. throw new MQClientException("subscription exception", e);
  28. }
  29. }

3、初始化MqClientInstance、RebalanceImpl、PullApiWrapper

创建MqClientInstance, 无论在生产者端还是消费者端都是一个很重要的类, 封装了Topic信息、broker信息,当然还有生产者和消费者的信息。

  1. public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
  2. String clientId = clientConfig.buildMQClientId();
  3. MQClientInstance instance = this.factoryTable.get(clientId);
  4. if (null == instance) {
  5. instance = new MQClientInstance(clientConfig.cloneClientConfig(),
  6. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
  7. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
  8. if (prev != null) {
  9. instance = prev;
  10. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
  11. } else {
  12. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
  13. }
  14. }
  15. return instance;
  16. }

构造RebalanceImpl 用来负载消费者与队列的消费关系

  1. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
  2. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
  3. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
  4. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

构造PullApiWrapper 消费者拉取消息类

  1. this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
  2. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

4、设置消息偏移量

如果是广播模式消费,消息消费进度存储在消费端,如果是集群模式消费,消息消费进度存储在 broker 端

  1. if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  2. this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
  3. } else {
  4. switch (this.defaultMQPushConsumer.getMessageModel()) {
  5. caseBROADCASTING:
  6. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  7. break;
  8. caseCLUSTERING:
  9. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
  10. break;
  11. default:
  12. break;
  13. }
  14. this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
  15. }
  16. this.offsetStore.load();

5、是否是顺序消费

根据是否是顺序消费构造不同的ConsumeMessageService

  1. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  2. this.consumeOrderly = true;
  3. this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
  4. } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  5. this.consumeOrderly = false;
  6. this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
  7. }

区别在于启动的线程任务不同:

顺序消费线程:

  1. if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
  2. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. ConsumeMessageOrderlyService.this.lockMQPeriodically();
  7. } catch (Throwable e) {
  8. log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
  9. }
  10. }
  11. }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
  12. }

正常消费线程:

  1. this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. cleanExpireMsg();
  6. } catch (Throwable e) {
  7. log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
  8. }
  9. }
  10. }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);

6、启动MQClientInstance

消费者与生产者共用 MQClientInstance

大部分流程已经在生产者启动流程中讲解,这里主要讲解与生产者不同的部分

启动保证消费者偏移量最终一致性的任务

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. MQClientInstance.this.persistAllConsumerOffset();
  6. } catch (Exception e) {
  7. log.error("ScheduledTask persistAllConsumerOffset exception", e);
  8. }
  9. }
  10. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

启动调整线程池大小任务:

  1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  2. @Override
  3. public void run() {
  4. try {
  5. MQClientInstance.this.adjustThreadPool();
  6. } catch (Exception e) {
  7. log.error("ScheduledTask adjustThreadPool exception", e);
  8. }
  9. }
  10. }, 1, 1, TimeUnit.MINUTES);

启动重平衡服务:

this.rebalanceService.start();

7、更新订阅主题信息

更新主题订阅信息:

  1. private void updateTopicSubscribeInfoWhenSubscriptionChanged() {
  2. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  3. if (subTable != null) {
  4. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
  5. final String topic = entry.getKey();
  6. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  7. }
  8. }
  9. }