该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 消息消费流程

拉取消息 成功之后 会调用 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest 组装 消费消息 请求

获取 consumeMessageBatchMaxSize,表示一个 ConsumeRequest 包含的消息 数量,默认为 1

入参 msgs 为拉取消息的最大值,默认为 32

如果 msgs 小于等于 consumeMessageBatchMaxSize,直接创建ConsumeRequest任务并提交到 线程池,当出现RejectedExecutionException异常时会重新提交任务,但是查看线程池的队列

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

为无界队列,最大值为Integer.MAX_VALUE,理论上不会出现该异常

  1. if (msgs.size() <= consumeBatchSize) {
  2. ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
  3. try {
  4. this.consumeExecutor.submit(consumeRequest);
  5. } catch (RejectedExecutionException e) {
  6. this.submitConsumeRequestLater(consumeRequest);
  7. }
  8. }

如果 msgs 大于 consumeMessageBatchMaxSize,消息分批处理,即创建多个ConsumeRequest任务

  1. for (int total = 0; total < msgs.size(); ) {
  2. List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
  3. for (int i = 0; i < consumeBatchSize; i++, total++) {
  4. if (total < msgs.size()) {
  5. msgThis.add(msgs.get(total));
  6. } else {
  7. break;
  8. }
  9. }
  10. ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
  11. try {
  12. this.consumeExecutor.submit(consumeRequest);
  13. } catch (RejectedExecutionException e) {
  14. for (; total < msgs.size(); total++) {
  15. msgThis.add(msgs.get(total));
  16. }
  17. this.submitConsumeRequestLater(consumeRequest);
  18. }
  19. }

class ConsumeRequest implements Runnable

详细的消费逻辑查看 org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run

第 1 步:首先会校验队列的 dropped 是否为 true,当队列重平衡的时候,该队列可能会被分配给其他消费者,如果该队列被分配给其他消费者,会设置 dropped 为 true

  1. if (this.processQueue.isDropped()) {
  2. log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
  3. return;
  4. }

第 2 步:如果是重试消息重新设置主题

  1. public void resetRetryAndNamespace(final List<MessageExt> msgs, String consumerGroup) {
  2. final String groupTopic = MixAll.getRetryTopic(consumerGroup);
  3. for (MessageExt msg : msgs) {
  4. String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
  5. if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
  6. msg.setTopic(retryTopic);
  7. }
  8. if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
  9. msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
  10. }
  11. }
  12. }

第 3 步:如果有钩子函数则执行

  1. if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  2. consumeMessageContext = new ConsumeMessageContext();
  3. consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
  4. consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
  5. consumeMessageContext.setProps(new HashMap<String, String>());
  6. consumeMessageContext.setMq(messageQueue);
  7. consumeMessageContext.setMsgList(msgs);
  8. consumeMessageContext.setSuccess(false);
  9. ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
  10. }

第 4 步:调用消息监听器的consumeMessage执行具体的消费逻辑 ,返回值为ConsumeConcurrentlyStatus

  1. try {
  2. if (msgs != null && !msgs.isEmpty()) {
  3. for (MessageExt msg : msgs) {
  4. MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
  5. }
  6. }
  7. status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  8. } catch (Throwable e) {
  9. log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
  10. RemotingHelper.exceptionSimpleDesc(e),
  11. ConsumeMessageConcurrentlyService.this.consumerGroup,
  12. msgs,
  13. messageQueue), e);
  14. hasException = true;
  15. }
  1. public enum ConsumeConcurrentlyStatus {
  2. /**
  3. * Success consumption
  4. */
  5. CONSUME_SUCCESS,
  6. /**
  7. * Failure consumption,later try to consume
  8. */
  9. RECONSUME_LATER;
  10. }

第 5 步:如果有 钩子 函数执行钩子

  1. if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
  2. consumeMessageContext.setStatus(status.toString());
  3. consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS== status);
  4. ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
  5. }

第 6 步:再次校验队列 的 dropped 状态 ,如果为 false 才会对结果进行处理

  1. if (!processQueue.isDropped()) {
  2. ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  3. } else {
  4. log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
  5. }

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#processConsumeResult

第 7 步:计算 ackIndex,如果为CONSUME_SUCCESS等于consumeRequest.getMsgs().size() - 1;

如果为RECONSUME_LATER等于-1

  1. switch (status) {
  2. caseCONSUME_SUCCESS:
  3. if (ackIndex >= consumeRequest.getMsgs().size()) {
  4. ackIndex = consumeRequest.getMsgs().size() - 1;
  5. }
  6. int ok = ackIndex + 1;
  7. int failed = consumeRequest.getMsgs().size() - ok;
  8. this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
  9. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
  10. break;
  11. caseRECONSUME_LATER:
  12. ackIndex = -1;
  13. this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
  14. consumeRequest.getMsgs().size());
  15. break;
  16. default:
  17. break;
  18. }

第 8 步:如果是广播模式并且是消费失败,打印警告 信息,如果是集群模式并且消费失败会将消息发送到 broker,如果发送失败将消息封装到 consumerRequest 中延迟消费

  1. switch (this.defaultMQPushConsumer.getMessageModel()) {
  2. caseBROADCASTING:
  3. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  4. MessageExt msg = consumeRequest.getMsgs().get(i);
  5. log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
  6. }
  7. break;
  8. caseCLUSTERING:
  9. List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
  10. for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
  11. MessageExt msg = consumeRequest.getMsgs().get(i);
  12. boolean result = this.sendMessageBack(msg, context);
  13. if (!result) {
  14. msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
  15. msgBackFailed.add(msg);
  16. }
  17. }
  18. if (!msgBackFailed.isEmpty()) {
  19. consumeRequest.getMsgs().removeAll(msgBackFailed);
  20. this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
  21. }
  22. break;
  23. default:
  24. break;
  25. }

第 9 步:更新消息消费偏移量

  1. long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
  2. if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  3. this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
  4. }