该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 消息拉取流程

之前在消费者启动流程中描述过 MQClientInstance 的启动流程,在启动过程中会启动 PullMessageService,它继承了ServiceThread,并且 ServiceThread 实现了 Runnable 接口,所以是单独启动了一个线程

public class PullMessageService extends ServiceThread

public abstract class ServiceThread implements Runnable

PullMessageService 的 run 方法如下:

protected volatile boolean stopped = false;

  1. public void run() {
  2. log.info(this.getServiceName() + " service started");
  3. while (!this.isStopped()) {
  4. try {
  5. PullRequest pullRequest = this.pullRequestQueue.take();
  6. this.pullMessage(pullRequest);
  7. } catch (InterruptedException ignored) {
  8. } catch (Exception e) {
  9. log.error("Pull Message Service Run Method exception", e);
  10. }
  11. }
  12. log.info(this.getServiceName() + " service end");
  13. }

只要没有停止,线程一直会从 PullRequestQueue 中获取 PullRequest 消息拉取任务,如果队列为空,会一直阻塞,直到有 PullRequest 被放入队列中,如果拿到了 PullRequest 就会调用 pullMessage 方法拉取消息

添加 PullRequest 有两个方法,一个是延迟添加,另一个是立即添加

  1. public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
  2. if (!isStopped()) {
  3. this.scheduledExecutorService.schedule(new Runnable() {
  4. @Override
  5. public void run() {
  6. PullMessageService.this.executePullRequestImmediately(pullRequest);
  7. }
  8. }, timeDelay, TimeUnit.MILLISECONDS);
  9. } else {
  10. log.warn("PullMessageServiceScheduledThread has shutdown");
  11. }
  12. }
  13. public void executePullRequestImmediately(final PullRequest pullRequest) {
  14. try {
  15. this.pullRequestQueue.put(pullRequest);
  16. } catch (InterruptedException e) {
  17. log.error("executePullRequestImmediately pullRequestQueue.put", e);
  18. }
  19. }

org.apache.rocketmq.client.impl.consumer.PullMessageService#pullMessage

拉取消息流程:

根据消费组获取MQConsumerInner,根据推模式还是拉模式,强转为DefaultMQPushConsumerImpl还是DefaultLitePullConsumerImpl

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

第1步:获取处理队列,如果队列被丢弃结束

  1. final ProcessQueue processQueue = pullRequest.getProcessQueue();
  2. if (processQueue.isDropped()) {
  3. log.info("the pull request[{}] is dropped.", pullRequest.toString());
  4. return;
  5. }

第 2 步:设置最后一次拉取时间戳

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

第 3 步:确认消费者是启动的状态,如果不是启动的状态,将PullRequest延迟3s放入队列

  1. try {
  2. this.makeSureStateOK();
  3. } catch (MQClientException e) {
  4. log.warn("pullMessage exception, consumer state not ok", e);
  5. this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  6. return;
  7. }

第 4 步:如果消费者停止了,将PullRequest延迟1s放入队列

  1. if (this.isPause()) {
  2. log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
  3. this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  4. return;
  5. }

第 5 步:缓存的消息数量大于1000,将PullRequest延迟50ms放入队列,每触发1000次流控输出警告信息

  1. if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
  2. this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  3. if ((queueFlowControlTimes++ % 1000) == 0) {
  4. log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
  5. this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  6. }
  7. return;
  8. }

第 6 步:缓存的消息大小大于100M 将PullRequest延迟50ms放入队列,每触发1000次输出警告信息

  1. if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
  2. this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  3. if ((queueFlowControlTimes++ % 1000) == 0) {
  4. log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
  5. this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
  6. }
  7. return;
  8. }

第 7 步:ProcessQueue中消息的最大偏移量与最小偏移量的差值不能大于2000,如果大于2000,触发流控,输出警告信息

  1. if (!this.consumeOrderly) {
  2. if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
  3. this.executePullRequestLater(pullRequest,PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  4. if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
  5. log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
  6. processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
  7. pullRequest, queueMaxSpanFlowControlTimes);
  8. }
  9. return;
  10. }
  11. }

第 8 步:如果ProcessQueue被锁了,判断上一个PullRequest是否被锁,如果没有被锁通过RebalanceImpl计算拉取消息偏移量,如果计算异常,将请求延迟3s加入队列如果下一次拉取消息 的偏移量大于计算出来的偏移量,说明要拉取的偏移量 大于消费偏移量,对 偏移量 进行修正,设置下一次拉取的偏移量为计算出来的偏移量

  1. if (processQueue.isLocked()) {
  2. if (!pullRequest.isPreviouslyLocked()) {
  3. long offset = -1L;
  4. try {
  5. offset = this.rebalanceImpl.computePullFromWhereWithException(pullRequest.getMessageQueue());
  6. } catch (Exception e) {
  7. this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  8. log.error("Failed to compute pull offset, pullResult: {}", pullRequest, e);
  9. return;
  10. }
  11. boolean brokerBusy = offset < pullRequest.getNextOffset();
  12. log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}",
  13. pullRequest, offset, brokerBusy);
  14. if (brokerBusy) {
  15. log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}",
  16. pullRequest, offset);
  17. }
  18. pullRequest.setPreviouslyLocked(true);
  19. pullRequest.setNextOffset(offset);
  20. }
  21. } else {
  22. this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  23. log.info("pull message later because not locked in broker, {}", pullRequest);
  24. return;
  25. }

第 9 步:根据主题名称获取订阅信息,如果为空,将请求延迟3s放入队列

  1. final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
  2. if (null == subscriptionData) {
  3. this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  4. log.warn("find the consumer's subscription failed, {}", pullRequest);
  5. return;
  6. }

第 10 步:创建PullCallback,为后面调用 拉取消息api做准备

  1. PullCallback pullCallback = new PullCallback() {
  2. @Override
  3. public void onSuccess(PullResult pullResult) {
  4. if (pullResult != null) {
  5. pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
  6. subscriptionData);
  7. switch (pullResult.getPullStatus()) {
  8. caseFOUND:
  9. long prevRequestOffset = pullRequest.getNextOffset();
  10. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  11. long pullRT = System.currentTimeMillis() - beginTimestamp;
  12. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
  13. pullRequest.getMessageQueue().getTopic(), pullRT);
  14. long firstMsgOffset = Long.MAX_VALUE;
  15. if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  16. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  17. } else {
  18. firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
  19. DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
  20. pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
  21. boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  22. DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
  23. pullResult.getMsgFoundList(),
  24. processQueue,
  25. pullRequest.getMessageQueue(),
  26. dispatchToConsume);
  27. if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
  28. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
  29. DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
  30. } else {
  31. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  32. }
  33. }
  34. if (pullResult.getNextBeginOffset() < prevRequestOffset
  35. || firstMsgOffset < prevRequestOffset) {
  36. log.warn("[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
  37. pullResult.getNextBeginOffset(),
  38. firstMsgOffset,
  39. prevRequestOffset);
  40. }
  41. break;
  42. caseNO_NEW_MSG:
  43. caseNO_MATCHED_MSG:
  44. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  45. DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
  46. DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  47. break;
  48. caseOFFSET_ILLEGAL:
  49. log.warn("the pull request offset illegal, {} {}",
  50. pullRequest.toString(), pullResult.toString());
  51. pullRequest.setNextOffset(pullResult.getNextBeginOffset());
  52. pullRequest.getProcessQueue().setDropped(true);
  53. DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
  54. @Override
  55. public void run() {
  56. try {
  57. DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
  58. pullRequest.getNextOffset(), false);
  59. DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
  60. DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
  61. log.warn("fix the pull request offset, {}", pullRequest);
  62. } catch (Throwable e) {
  63. log.error("executeTaskLater Exception", e);
  64. }
  65. }
  66. }, 10000);
  67. break;
  68. default:
  69. break;
  70. }
  71. }
  72. }
  73. @Override
  74. public void onException(Throwable e) {
  75. if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  76. log.warn("execute the pull request exception", e);
  77. }
  78. DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  79. }
  80. };

第 11 步:设置系统标记

FLAG_COMMIT_OFFSET: 消费进度 大于0

FLAG_SUSPEND: 拉取消息时支持线程挂起

FLAG_SUBSCRIPTION: 消息过滤机制表达式

FLAG_CLASS_FILTER: 消息过滤机制是否为类过滤

  1. int sysFlag = PullSysFlag.buildSysFlag(
  2. commitOffsetEnable, // commitOffset
  3. true, // suspend
  4. subExpression != null, // subscription
  5. classFilter // class filter
  6. );

第 12 步:调用 broker 拉取消息

  1. // 每一个参数的含义如下
  2. this.pullAPIWrapper.pullKernelImpl(
  3. pullRequest.getMessageQueue(), // 要拉取的消息队列
  4. subExpression, // 消息过滤表达式
  5. subscriptionData.getExpressionType(), // 过滤表达式类型
  6. subscriptionData.getSubVersion(), // 时间戳
  7. pullRequest.getNextOffset(), // 消息拉取的开始偏移量
  8. this.defaultMQPushConsumer.getPullBatchSize(), // 拉取消息的数量 默认32条
  9. sysFlag, // 系统标记
  10. commitOffsetValue, // 消费的偏移量
  11. BROKER_SUSPEND_MAX_TIME_MILLIS, // 允许broker挂起的时间 默认15s
  12. CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 允许的超时时间 默认30s
  13. CommunicationMode.ASYNC, // 默认为异步拉取
  14. pullCallback // 拉取消息之后的回调
  15. );