该文所涉及的 RocketMQ 源码版本为 4.9.3。

RocketMQ 消息发送存储流程

第一步:检查消息存储状态

org.apache.rocketmq.store.DefaultMessageStore#checkStoreStatus

1、检查 broker 是否可用

  1. if (this.shutdown) {
  2. log.warn("message store has shutdown, so putMessage is forbidden");
  3. return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  4. }

2、检查 broker 的角色

  1. if (BrokerRole.SLAVE== this.messageStoreConfig.getBrokerRole()) {
  2. long value = this.printTimes.getAndIncrement();
  3. if ((value % 50000) == 0) {
  4. log.warn("broke role is slave, so putMessage is forbidden");
  5. }
  6. return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  7. }

3、检查 messageStore 是否可写

  1. if (!this.runningFlags.isWriteable()) {
  2. long value = this.printTimes.getAndIncrement();
  3. if ((value % 50000) == 0) {
  4. log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
  5. "the broker's disk is full, write to logic queue error, write to index file error, etc");
  6. }
  7. return PutMessageStatus.SERVICE_NOT_AVAILABLE;
  8. } else {
  9. this.printTimes.set(0);
  10. }

4、检查 pageCache

  1. if (this.isOSPageCacheBusy()) {
  2. return PutMessageStatus.OS_PAGECACHE_BUSY;
  3. }

第二步:检查消息

org.apache.rocketmq.store.DefaultMessageStore#checkMessage

1、校验主题的长度不能大于 127

  1. if (msg.getTopic().length() > Byte.MAX_VALUE) {
  2. log.warn("putMessage message topic length too long " + msg.getTopic().length());
  3. return PutMessageStatus.MESSAGE_ILLEGAL;
  4. }

2、校验属性的长度不能大于 32767

  1. if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
  2. log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
  3. return PutMessageStatus.MESSAGE_ILLEGAL;
  4. }

第三步:获取当前可以写入的 CommitLog 文件

CommitLog 文件的存储目录为${ROCKET_HOME}/store/commitlog ,MappedFileQueue 对应此文件夹,MappedFile 对应文件夹下的文件

  1. msg.setStoreTimestamp(beginLockTimestamp);
  2. if (null == mappedFile || mappedFile.isFull()) {
  3. mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
  4. }
  5. if (null == mappedFile) {
  6. log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
  7. return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
  8. }

如果是第一次写入或者最新偏移量所属文件已满,创建新的文件

  1. public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
  2. long createOffset = -1;
  3. MappedFile mappedFileLast = getLastMappedFile();
  4. if (mappedFileLast == null) {
  5. createOffset = startOffset - (startOffset % this.mappedFileSize);
  6. }
  7. if (mappedFileLast != null && mappedFileLast.isFull()) {
  8. createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
  9. }
  10. if (createOffset != -1 && needCreate) {
  11. return tryCreateMappedFile(createOffset);
  12. }
  13. return mappedFileLast;
  14. }

第四步:将消息写入到 MappedFile 中

  1. public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
  2. PutMessageContext putMessageContext) {
  3. assert messageExt != null;
  4. assert cb != null;
  5. int currentPos = this.wrotePosition.get();
  6. if (currentPos < this.fileSize) {
  7. ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
  8. byteBuffer.position(currentPos);
  9. AppendMessageResult result;
  10. if (messageExt instanceof MessageExtBrokerInner) {
  11. result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
  12. (MessageExtBrokerInner) messageExt, putMessageContext);
  13. } else if (messageExt instanceof MessageExtBatch) {
  14. result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
  15. (MessageExtBatch) messageExt, putMessageContext);
  16. } else {
  17. return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  18. }
  19. this.wrotePosition.addAndGet(result.getWroteBytes());
  20. this.storeTimestamp = result.getStoreTimestamp();
  21. return result;
  22. }
  23. log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
  24. return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
  25. }

org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend(long, java.nio.ByteBuffer, int, org.apache.rocketmq.store.MessageExtBrokerInner, org.apache.rocketmq.store.CommitLog.PutMessageContext)

计算要写入的偏移量

long wroteOffset = fileFromOffset + byteBuffer.position();

对事务消息做特殊处理:

  1. final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
  2. switch (tranType) {
  3. // Prepared and Rollback message is not consumed, will not enter the
  4. // consumer queue
  5. case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
  6. case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
  7. queueOffset = 0L;
  8. break;
  9. case MessageSysFlag.TRANSACTION_NOT_TYPE:
  10. case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
  11. default:
  12. break;
  13. }

构造 AppendMessageResult:

  1. AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
  2. msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

事务消息特殊处理:

  1. switch (tranType) {
  2. case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
  3. case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
  4. break;
  5. case MessageSysFlag.TRANSACTION_NOT_TYPE:
  6. case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
  7. // The next update ConsumeQueue information
  8. CommitLog.this.topicQueueTable.put(key, ++queueOffset);
  9. CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
  10. break;
  11. default:
  12. break;
  13. }