同步消息发送
1):同步消息发送代码执行
首先利用DefaultMQProducer实现类进行发送消息。
//检查消息Validators.checkMessage(msg, this);//设置Topicmsg.setTopic(withNamespace(msg.getTopic()));//调用MQProducer实现并返回结果return this.defaultMQProducerImpl.send(msg);
DefaultMQProducerImpl#send
//加上发送默认超时时间send(msg, this.defaultMQProducer.getSendMsgTimeout());//默认为同步this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
DefaultMQProducerImpl#sendDefaultImpl
//确保当前状态为运行中,否则抛异常this.makeSureStateOK();//根据消息跟producer进行消息检查Validators.checkMessage(msg, this.defaultMQProducer);//生成随机invokeIDfinal long invokeID = random.nextLong();//记录第一次开始时间long beginTimestampFirst = System.currentTimeMillis();//记录前一次开始时间long beginTimestampPrev = beginTimestampFirst;//结束时间long endTimestamp = beginTimestampFirst;//获取Topic订阅信息(主要为了确定将消息发到哪一个Broker上)TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//发送次数总数,若为同步次数为1+2=3,否则为1int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;//当前次数int times = 0;String[] brokersSent = new String[timesTotal];//循环发送消息(用于重试)for (; times < timesTotal; times++) {//获取broker服务名称String lastBrokerName = null == mq ? null : mq.getBrokerName();//根据Topic信息+broker名称选择一个消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();//若times大于0,说明需要重试if (times > 0) {//重新发送时,重设Topic名称msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}//计算耗时long costTime = beginTimestampPrev - beginTimestampFirst;//若耗时已经超过发送时的超时时间,则退出if (timeout < costTime) {callTimeout = true;break;}//执行消息发送sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);//计算结束时间endTimestamp = System.currentTimeMillis();//更新当前Broker状态。表示当前broker是好用的this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC://同步,判断发送不成功的情况下是否需要重试if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}//返回结果return sendResult;default:break;}} catch (RemotingException e) {//计算结束时间endTimestamp = System.currentTimeMillis();//更新当前Broker。表示当前broker是不好用的this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {//同上endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {//同上endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {//同上endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn("sendKernelImpl exception", e);log.warn(msg.toString());throw e;}} else {break;}}//返回结果不为空,则返回if (sendResult != null) {return sendResult;}//报错String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}//检查NameServer地址设置,地址为空就抛异常validateNameServerSetting();//抛异常throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
DefaultMQProducerImpl#sendKernelImpl
//计算开始时间long beginStartTime = System.currentTimeMillis();//根据Broker名称获取Broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//若Broker地址为空if (null == brokerAddr) {//那就根据Topic重新检索一下BrokertryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {//Vip通道BrokerbrokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {//设置唯一IDMessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;//设置nameSpaceif (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;//压缩msgif (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;msgBodyCompressed = true;}//系统标识final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}../部分代码省略//组装消息头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());//批量标识requestHeader.setBatch(msg instanceof MessageBatch);//判断Topic是否重试if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {//异步case ASYNC:../部分代码省略//计算耗时long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC://计算耗时long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}../部分代码省略
MQClientAPIImpl#sendMessage
../部分代码省略switch (communicationMode) {case ONEWAY://单向发送this.remotingClient.invokeOneway(addr, request, timeoutMillis);return null;case ASYNC:final AtomicInteger times = new AtomicInteger();//计算耗时long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeAsync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//异步发送this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,retryTimesWhenSendFailed, times, context, producer);return null;case SYNC://计算耗时long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTimeSync) {throw new RemotingTooMuchRequestException("sendMessage call timeout");}//同步发送return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);default:assert false;break;}return null;
MQClientAPIImpl#sendMessageSync
//远程调用同步请求RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);assert response != null;//返回结果return this.processSendResponse(brokerName, msg, response,addr);
MQClientAPIImpl#invokeSync
//计算开始时间long beginStartTime = System.currentTimeMillis();//根据Broker的地址创建通道final Channel channel = this.getAndCreateChannel(addr);//判断通道不为空且存活if (channel != null && channel.isActive()) {try {//执行before钩子doBeforeRpcHooks(addr, request);//计算耗时long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call timeout");}//调用同步实现,返回结果RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);//执行after钩子doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {//超时关闭通道if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {//关闭通道,抛异常this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}
MQClientAPIImpl#invokeSyncImpl
//请求ID,producer唯一final int opaque = request.getOpaque();try {//根据请求ID,创建返回futurefinal ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);//放入返回集合this.responseTable.put(opaque, responseFuture);//获取Socket服务器地址,也就是Broker地址final SocketAddress addr = channel.remoteAddress();//写入请求消息,并刷新channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});//这里利用的是Netty的机制,通过CountDownLatch来实现RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}return responseCommand;} finally {//删除该结果,避免集合越来越庞大this.responseTable.remove(opaque);}
2):同步消息发送逻辑总结
- 消息长度验证
- 查找主题路由信息,若没有从NameServer中更新
- 设置重试次数,并选择Broker的消息队列。若出现异常情况,切换Broker进行重试
- 发送消息(检查钩子,压缩消息,组装头部信息)
- 判断发送方式,调用Netty的Socket通道进行发送消息请求
- 获取结果指令,返回
