同步消息发送

1):同步消息发送代码执行

首先利用DefaultMQProducer实现类进行发送消息。

  1. //检查消息
  2. Validators.checkMessage(msg, this);
  3. //设置Topic
  4. msg.setTopic(withNamespace(msg.getTopic()));
  5. //调用MQProducer实现并返回结果
  6. return this.defaultMQProducerImpl.send(msg);

DefaultMQProducerImpl#send

  1. //加上发送默认超时时间
  2. send(msg, this.defaultMQProducer.getSendMsgTimeout());
  3. //默认为同步
  4. this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);

DefaultMQProducerImpl#sendDefaultImpl

  1. //确保当前状态为运行中,否则抛异常
  2. this.makeSureStateOK();
  3. //根据消息跟producer进行消息检查
  4. Validators.checkMessage(msg, this.defaultMQProducer);
  5. //生成随机invokeID
  6. final long invokeID = random.nextLong();
  7. //记录第一次开始时间
  8. long beginTimestampFirst = System.currentTimeMillis();
  9. //记录前一次开始时间
  10. long beginTimestampPrev = beginTimestampFirst;
  11. //结束时间
  12. long endTimestamp = beginTimestampFirst;
  13. //获取Topic订阅信息(主要为了确定将消息发到哪一个Broker上)
  14. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
  15. if (topicPublishInfo != null && topicPublishInfo.ok()) {
  16. boolean callTimeout = false;
  17. MessageQueue mq = null;
  18. Exception exception = null;
  19. SendResult sendResult = null;
  20. //发送次数总数,若为同步次数为1+2=3,否则为1
  21. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
  22. //当前次数
  23. int times = 0;
  24. String[] brokersSent = new String[timesTotal];
  25. //循环发送消息(用于重试)
  26. for (; times < timesTotal; times++) {
  27. //获取broker服务名称
  28. String lastBrokerName = null == mq ? null : mq.getBrokerName();
  29. //根据Topic信息+broker名称选择一个消息队列
  30. MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
  31. if (mqSelected != null) {
  32. mq = mqSelected;
  33. brokersSent[times] = mq.getBrokerName();
  34. try {
  35. beginTimestampPrev = System.currentTimeMillis();
  36. //若times大于0,说明需要重试
  37. if (times > 0) {
  38. //重新发送时,重设Topic名称
  39. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
  40. }
  41. //计算耗时
  42. long costTime = beginTimestampPrev - beginTimestampFirst;
  43. //若耗时已经超过发送时的超时时间,则退出
  44. if (timeout < costTime) {
  45. callTimeout = true;
  46. break;
  47. }
  48. //执行消息发送
  49. sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
  50. //计算结束时间
  51. endTimestamp = System.currentTimeMillis();
  52. //更新当前Broker状态。表示当前broker是好用的
  53. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  54. switch (communicationMode) {
  55. case ASYNC:
  56. return null;
  57. case ONEWAY:
  58. return null;
  59. case SYNC:
  60. //同步,判断发送不成功的情况下是否需要重试
  61. if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
  62. if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
  63. continue;
  64. }
  65. }
  66. //返回结果
  67. return sendResult;
  68. default:
  69. break;
  70. }
  71. } catch (RemotingException e) {
  72. //计算结束时间
  73. endTimestamp = System.currentTimeMillis();
  74. //更新当前Broker。表示当前broker是不好用的
  75. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  76. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  77. log.warn(msg.toString());
  78. exception = e;
  79. continue;
  80. } catch (MQClientException e) {
  81. //同上
  82. endTimestamp = System.currentTimeMillis();
  83. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  84. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  85. log.warn(msg.toString());
  86. exception = e;
  87. continue;
  88. } catch (MQBrokerException e) {
  89. //同上
  90. endTimestamp = System.currentTimeMillis();
  91. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
  92. log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  93. log.warn(msg.toString());
  94. exception = e;
  95. if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
  96. continue;
  97. } else {
  98. if (sendResult != null) {
  99. return sendResult;
  100. }
  101. throw e;
  102. }
  103. } catch (InterruptedException e) {
  104. //同上
  105. endTimestamp = System.currentTimeMillis();
  106. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
  107. log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
  108. log.warn(msg.toString());
  109. log.warn("sendKernelImpl exception", e);
  110. log.warn(msg.toString());
  111. throw e;
  112. }
  113. } else {
  114. break;
  115. }
  116. }
  117. //返回结果不为空,则返回
  118. if (sendResult != null) {
  119. return sendResult;
  120. }
  121. //报错
  122. String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
  123. times,
  124. System.currentTimeMillis() - beginTimestampFirst,
  125. msg.getTopic(),
  126. Arrays.toString(brokersSent));
  127. info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
  128. MQClientException mqClientException = new MQClientException(info, exception);
  129. if (callTimeout) {
  130. throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
  131. }
  132. if (exception instanceof MQBrokerException) {
  133. mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
  134. } else if (exception instanceof RemotingConnectException) {
  135. mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
  136. } else if (exception instanceof RemotingTimeoutException) {
  137. mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
  138. } else if (exception instanceof MQClientException) {
  139. mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
  140. }
  141. throw mqClientException;
  142. }
  143. //检查NameServer地址设置,地址为空就抛异常
  144. validateNameServerSetting();
  145. //抛异常
  146. throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
  147. null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

DefaultMQProducerImpl#sendKernelImpl

  1. //计算开始时间
  2. long beginStartTime = System.currentTimeMillis();
  3. //根据Broker名称获取Broker地址
  4. String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  5. //若Broker地址为空
  6. if (null == brokerAddr) {
  7. //那就根据Topic重新检索一下Broker
  8. tryToFindTopicPublishInfo(mq.getTopic());
  9. brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
  10. }
  11. SendMessageContext context = null;
  12. if (brokerAddr != null) {
  13. //Vip通道Broker
  14. brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
  15. byte[] prevBody = msg.getBody();
  16. try {
  17. //for MessageBatch,ID has been set in the generating process
  18. if (!(msg instanceof MessageBatch)) {
  19. //设置唯一ID
  20. MessageClientIDSetter.setUniqID(msg);
  21. }
  22. boolean topicWithNamespace = false;
  23. //设置nameSpace
  24. if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
  25. msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
  26. topicWithNamespace = true;
  27. }
  28. int sysFlag = 0;
  29. boolean msgBodyCompressed = false;
  30. //压缩msg
  31. if (this.tryToCompressMessage(msg)) {
  32. sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
  33. msgBodyCompressed = true;
  34. }
  35. //系统标识
  36. final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
  37. if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
  38. sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
  39. }
  40. ../部分代码省略
  41. //组装消息头
  42. SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
  43. requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
  44. requestHeader.setTopic(msg.getTopic());
  45. requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
  46. requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
  47. requestHeader.setQueueId(mq.getQueueId());
  48. requestHeader.setSysFlag(sysFlag);
  49. requestHeader.setBornTimestamp(System.currentTimeMillis());
  50. requestHeader.setFlag(msg.getFlag());
  51. requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
  52. requestHeader.setReconsumeTimes(0);
  53. requestHeader.setUnitMode(this.isUnitMode());
  54. //批量标识
  55. requestHeader.setBatch(msg instanceof MessageBatch);
  56. //判断Topic是否重试
  57. if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
  58. String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
  59. if (reconsumeTimes != null) {
  60. requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
  61. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
  62. }
  63. String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
  64. if (maxReconsumeTimes != null) {
  65. requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
  66. MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
  67. }
  68. }
  69. SendResult sendResult = null;
  70. switch (communicationMode) {
  71. //异步
  72. case ASYNC:
  73. ../部分代码省略
  74. //计算耗时
  75. long costTimeAsync = System.currentTimeMillis() - beginStartTime;
  76. if (timeout < costTimeAsync) {
  77. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  78. }
  79. //发送消息
  80. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  81. brokerAddr,
  82. mq.getBrokerName(),
  83. tmpMessage,
  84. requestHeader,
  85. timeout - costTimeAsync,
  86. communicationMode,
  87. sendCallback,
  88. topicPublishInfo,
  89. this.mQClientFactory,
  90. this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
  91. context,
  92. this);
  93. break;
  94. case ONEWAY:
  95. case SYNC:
  96. //计算耗时
  97. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  98. if (timeout < costTimeSync) {
  99. throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
  100. }
  101. //发送消息
  102. sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
  103. brokerAddr,
  104. mq.getBrokerName(),
  105. msg,
  106. requestHeader,
  107. timeout - costTimeSync,
  108. communicationMode,
  109. context,
  110. this);
  111. break;
  112. default:
  113. assert false;
  114. break;
  115. }
  116. ../部分代码省略

MQClientAPIImpl#sendMessage

  1. ../部分代码省略
  2. switch (communicationMode) {
  3. case ONEWAY:
  4. //单向发送
  5. this.remotingClient.invokeOneway(addr, request, timeoutMillis);
  6. return null;
  7. case ASYNC:
  8. final AtomicInteger times = new AtomicInteger();
  9. //计算耗时
  10. long costTimeAsync = System.currentTimeMillis() - beginStartTime;
  11. if (timeoutMillis < costTimeAsync) {
  12. throw new RemotingTooMuchRequestException("sendMessage call timeout");
  13. }
  14. //异步发送
  15. this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
  16. retryTimesWhenSendFailed, times, context, producer);
  17. return null;
  18. case SYNC:
  19. //计算耗时
  20. long costTimeSync = System.currentTimeMillis() - beginStartTime;
  21. if (timeoutMillis < costTimeSync) {
  22. throw new RemotingTooMuchRequestException("sendMessage call timeout");
  23. }
  24. //同步发送
  25. return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
  26. default:
  27. assert false;
  28. break;
  29. }
  30. return null;

MQClientAPIImpl#sendMessageSync

  1. //远程调用同步请求
  2. RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
  3. assert response != null;
  4. //返回结果
  5. return this.processSendResponse(brokerName, msg, response,addr);

MQClientAPIImpl#invokeSync

  1. //计算开始时间
  2. long beginStartTime = System.currentTimeMillis();
  3. //根据Broker的地址创建通道
  4. final Channel channel = this.getAndCreateChannel(addr);
  5. //判断通道不为空且存活
  6. if (channel != null && channel.isActive()) {
  7. try {
  8. //执行before钩子
  9. doBeforeRpcHooks(addr, request);
  10. //计算耗时
  11. long costTime = System.currentTimeMillis() - beginStartTime;
  12. if (timeoutMillis < costTime) {
  13. throw new RemotingTimeoutException("invokeSync call timeout");
  14. }
  15. //调用同步实现,返回结果
  16. RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
  17. //执行after钩子
  18. doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
  19. return response;
  20. } catch (RemotingSendRequestException e) {
  21. log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
  22. this.closeChannel(addr, channel);
  23. throw e;
  24. } catch (RemotingTimeoutException e) {
  25. //超时关闭通道
  26. if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
  27. this.closeChannel(addr, channel);
  28. log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
  29. }
  30. log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
  31. throw e;
  32. }
  33. } else {
  34. //关闭通道,抛异常
  35. this.closeChannel(addr, channel);
  36. throw new RemotingConnectException(addr);
  37. }

MQClientAPIImpl#invokeSyncImpl

  1. //请求ID,producer唯一
  2. final int opaque = request.getOpaque();
  3. try {
  4. //根据请求ID,创建返回future
  5. final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
  6. //放入返回集合
  7. this.responseTable.put(opaque, responseFuture);
  8. //获取Socket服务器地址,也就是Broker地址
  9. final SocketAddress addr = channel.remoteAddress();
  10. //写入请求消息,并刷新
  11. channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
  12. @Override
  13. public void operationComplete(ChannelFuture f) throws Exception {
  14. if (f.isSuccess()) {
  15. responseFuture.setSendRequestOK(true);
  16. return;
  17. } else {
  18. responseFuture.setSendRequestOK(false);
  19. }
  20. responseTable.remove(opaque);
  21. responseFuture.setCause(f.cause());
  22. responseFuture.putResponse(null);
  23. log.warn("send a request command to channel <" + addr + "> failed.");
  24. }
  25. });
  26. //这里利用的是Netty的机制,通过CountDownLatch来实现
  27. RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
  28. if (null == responseCommand) {
  29. if (responseFuture.isSendRequestOK()) {
  30. throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
  31. responseFuture.getCause());
  32. } else {
  33. throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
  34. }
  35. }
  36. return responseCommand;
  37. } finally {
  38. //删除该结果,避免集合越来越庞大
  39. this.responseTable.remove(opaque);
  40. }

2):同步消息发送逻辑总结

  1. 消息长度验证
  2. 查找主题路由信息,若没有从NameServer中更新
  3. 设置重试次数,并选择Broker的消息队列。若出现异常情况,切换Broker进行重试
  4. 发送消息(检查钩子,压缩消息,组装头部信息)
    1. 判断发送方式,调用Netty的Socket通道进行发送消息请求
    2. 获取结果指令,返回