该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 消息发送流程

这里以同步发送为示例讲解:

入口:

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)

消息发送 默认超时时间 3 秒

第一步:验证

主题的长度不能大于 127,消息的大小不能大于 4M

  1. public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
  2. if (null == msg) {
  3. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
  4. }
  5. // topic
  6. Validators.checkTopic(msg.getTopic());
  7. Validators.isNotAllowedSendTopic(msg.getTopic());
  8. // body
  9. if (null == msg.getBody()) {
  10. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
  11. }
  12. if (0 == msg.getBody().length) {
  13. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
  14. }
  15. if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
  16. throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
  17. "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
  18. }
  19. }

第二步:查找路由信息

如果缓存中存在路由信息,并且队列信息不为空直接返回路由信息,如果缓存不存在,根据当前主题从 NameServer 中获取 路由信息,如果路由信息没有找到,根据默认主题查询路由信息,如果没有找到抛出异常

  1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
  2. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
  3. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
  4. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
  5. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
  6. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  7. }
  8. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
  9. return topicPublishInfo;
  10. } else {
  11. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
  12. topicPublishInfo = this.topicPublishInfoTable.get(topic);
  13. return topicPublishInfo;
  14. }
  15. }

从 NameServer 查询路由信息方法:

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)

1、如果是默认的主题查询路由信息,返回成功,更新读队列和写队列的个数为默认的队列个数

  1. if (isDefault && defaultMQProducer != null) {
  2. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
  3. clientConfig.getMqClientApiTimeout());
  4. if (topicRouteData != null) {
  5. for (QueueData data : topicRouteData.getQueueDatas()) {
  6. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
  7. data.setReadQueueNums(queueNums);
  8. data.setWriteQueueNums(queueNums);
  9. }
  10. }
  11. }

2、返回路由信息之后,与本地缓存的路由信息比对,判断路由信息是否发生变化,如果发生变化更新 broker 地址缓存,更新topicPublishInfoTable,更新 topic 路由信息缓存topicRouteTable

  1. if (changed) {
  2. TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
  3. for (BrokerData bd : topicRouteData.getBrokerDatas()) {
  4. this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
  5. }
  6. // Update Pub info
  7. if (!producerTable.isEmpty()) {
  8. TopicPublishInfo publishInfo =topicRouteData2TopicPublishInfo(topic, topicRouteData);
  9. publishInfo.setHaveTopicRouterInfo(true);
  10. Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
  11. while (it.hasNext()) {
  12. Entry<String, MQProducerInner> entry = it.next();
  13. MQProducerInner impl = entry.getValue();
  14. if (impl != null) {
  15. impl.updateTopicPublishInfo(topic, publishInfo);
  16. }
  17. }
  18. }
  19. log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
  20. this.topicRouteTable.put(topic, cloneTopicRouteData);
  21. return true;
  22. }

第三步:选择消息 队列

设置消息发送失败重试次数

int timesTotal = communicationMode == CommunicationMode.*SYNC* ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

首先判断是否启用故障延迟机制 ,默认不启用,第一次查询 lastBrokerName 为空,sendWhichQueue自增然后对队列个数取模获取队列,如果消息发送失败,下一次sendWhichQueue仍然自增然后对队列个数取模,可以规避掉上次失败的 broker

  1. public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
  2. if (lastBrokerName == null) {
  3. return selectOneMessageQueue();
  4. } else {
  5. for (int i = 0; i < this.messageQueueList.size(); i++) {
  6. int index = this.sendWhichQueue.incrementAndGet();
  7. int pos = Math.abs(index) % this.messageQueueList.size();
  8. if (pos < 0)
  9. pos = 0;
  10. MessageQueue mq = this.messageQueueList.get(pos);
  11. if (!mq.getBrokerName().equals(lastBrokerName)) {
  12. return mq;
  13. }
  14. }
  15. return selectOneMessageQueue();
  16. }
  17. }

如果启用故障延迟机制:

轮询获取队列 ,如果可用直接返回

  1. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
  2. int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
  3. if (pos < 0)
  4. pos = 0;
  5. MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
  6. if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
  7. return mq;
  8. }

判断是否可用逻辑:先从要规避的 broker 集合faultItemTable中获取该 broker 是否存在,如果存在判断是否可用,可用的标准是当前时间的时间戳大于上次该 broker 失败的时间 + 规避的时间,如果该 broker 在规避的 broker 集合中不存在,直接返回可用

  1. public boolean isAvailable(final String name) {
  2. final FaultItem faultItem = this.faultItemTable.get(name);
  3. if (faultItem != null) {
  4. return faultItem.isAvailable();
  5. }
  6. return true;
  7. }

如果没有可用的 broker,尝试从 规避的 broker 集合中选择一个可用的 broker,如果选择的 broker 没有写队列,则从规避的 broker 列表中移除该 broker

  1. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
  2. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
  3. if (writeQueueNums > 0) {
  4. final MessageQueue mq = tpInfo.selectOneMessageQueue();
  5. if (notBestBroker != null) {
  6. mq.setBrokerName(notBestBroker);
  7. mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
  8. }
  9. return mq;
  10. } else {
  11. latencyFaultTolerance.remove(notBestBroker);
  12. }

P.S. :

要规避的 broker 集合在同步发送的时候不会 更新,在异步发送的时候会更新

  1. public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
  2. if (this.sendLatencyFaultEnable) {
  3. long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
  4. this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
  5. }
  6. }

主要更新消息发送故障的延迟时间currentLatency和故障规避的 开始时间startTimestamp

  1. public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
  2. FaultItem old = this.faultItemTable.get(name);
  3. if (null == old) {
  4. final FaultItem faultItem = new FaultItem(name);
  5. faultItem.setCurrentLatency(currentLatency);
  6. faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  7. old = this.faultItemTable.putIfAbsent(name, faultItem);
  8. if (old != null) {
  9. old.setCurrentLatency(currentLatency);
  10. old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  11. }
  12. } else {
  13. old.setCurrentLatency(currentLatency);
  14. old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
  15. }
  16. }

总结:

不管开不开启故障延迟机制,都可以规避故障的 broker,只是开启故障延迟机制,会在一段时间内都不会访问到该 broker,而不开启只是下一次不会访问到该 broker

第四步:消息发送

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl

1、为消息分配全局唯一 id

  1. if (!(msg instanceof MessageBatch)) {
  2. MessageClientIDSetter.setUniqID(msg);
  3. }

2、消息体大于 4k 启用压缩

  1. boolean msgBodyCompressed = false;
  2. if (this.tryToCompressMessage(msg)) {
  3. sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
  4. msgBodyCompressed = true;
  5. }

3、如果是事务消息,设置消息类型为事务消息

  1. final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  2. if (Boolean.parseBoolean(tranMsg)) {
  3. sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
  4. }

4、校验是否超时

  1. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  2. if (timeout < costTimeSync) {
  3. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  4. }

5、组装请求头

  1. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  2. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  3. requestHeader.setTopic(msg.getTopic());
  4. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  5. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  6. requestHeader.setQueueId(mq.getQueueId());
  7. requestHeader.setSysFlag(sysFlag);
  8. requestHeader.setBornTimestamp(System.currentTimeMillis());
  9. requestHeader.setFlag(msg.getFlag());
  10. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  11. requestHeader.setReconsumeTimes(0);
  12. requestHeader.setUnitMode(this.isUnitMode());
  13. requestHeader.setBatch(msg instanceof MessageBatch);
  14. if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  15. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  16. if (reconsumeTimes != null) {
  17. requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  18. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  19. }
  20. String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  21. if (maxReconsumeTimes != null) {
  22. requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  23. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
  24. }
  25. }

6、发送请求

  1. caseSYNC:
  2. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  3. if (timeout < costTimeSync) {
  4. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  5. }
  6. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  7. brokerAddr,
  8. mq.getBrokerName(),
  9. msg,
  10. requestHeader,
  11. timeout - costTimeSync,
  12. communicationMode,
  13. context,
  14. this);
  15. break;

第五步:处理响应结果

1、处理状态码

  1. switch (response.getCode()) {
  2. case ResponseCode.FLUSH_DISK_TIMEOUT: {
  3. sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
  4. break;
  5. }
  6. case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
  7. sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
  8. break;
  9. }
  10. case ResponseCode.SLAVE_NOT_AVAILABLE: {
  11. sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
  12. break;
  13. }
  14. case ResponseCode.SUCCESS: {
  15. sendStatus = SendStatus.SEND_OK;
  16. break;
  17. }
  18. default: {
  19. throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
  20. }
  21. }

2、构造 SendResult

  1. SendResult sendResult = new SendResult(sendStatus,
  2. uniqMsgId,
  3. responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
  4. sendResult.setTransactionId(responseHeader.getTransactionId());
  5. String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
  6. String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
  7. if (regionId == null || regionId.isEmpty()) {
  8. regionId = MixAll.DEFAULT_TRACE_REGION_ID;
  9. }
  10. if (traceOn != null && traceOn.equals("false")) {
  11. sendResult.setTraceOn(false);
  12. } else {
  13. sendResult.setTraceOn(true);
  14. }
  15. sendResult.setRegionId(regionId);