同步消息发送
1):同步消息发送代码执行
首先利用DefaultMQProducer实现类进行发送消息。
//检查消息
Validators.checkMessage(msg, this);
//设置Topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用MQProducer实现并返回结果
return this.defaultMQProducerImpl.send(msg);
DefaultMQProducerImpl#send
//加上发送默认超时时间
send(msg, this.defaultMQProducer.getSendMsgTimeout());
//默认为同步
this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
DefaultMQProducerImpl#sendDefaultImpl
//确保当前状态为运行中,否则抛异常
this.makeSureStateOK();
//根据消息跟producer进行消息检查
Validators.checkMessage(msg, this.defaultMQProducer);
//生成随机invokeID
final long invokeID = random.nextLong();
//记录第一次开始时间
long beginTimestampFirst = System.currentTimeMillis();
//记录前一次开始时间
long beginTimestampPrev = beginTimestampFirst;
//结束时间
long endTimestamp = beginTimestampFirst;
//获取Topic订阅信息(主要为了确定将消息发到哪一个Broker上)
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//发送次数总数,若为同步次数为1+2=3,否则为1
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
//当前次数
int times = 0;
String[] brokersSent = new String[timesTotal];
//循环发送消息(用于重试)
for (; times < timesTotal; times++) {
//获取broker服务名称
String lastBrokerName = null == mq ? null : mq.getBrokerName();
//根据Topic信息+broker名称选择一个消息队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
//若times大于0,说明需要重试
if (times > 0) {
//重新发送时,重设Topic名称
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
//计算耗时
long costTime = beginTimestampPrev - beginTimestampFirst;
//若耗时已经超过发送时的超时时间,则退出
if (timeout < costTime) {
callTimeout = true;
break;
}
//执行消息发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
//计算结束时间
endTimestamp = System.currentTimeMillis();
//更新当前Broker状态。表示当前broker是好用的
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//同步,判断发送不成功的情况下是否需要重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
//返回结果
return sendResult;
default:
break;
}
} catch (RemotingException e) {
//计算结束时间
endTimestamp = System.currentTimeMillis();
//更新当前Broker。表示当前broker是不好用的
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
//同上
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
//同上
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
//同上
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
//返回结果不为空,则返回
if (sendResult != null) {
return sendResult;
}
//报错
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
//检查NameServer地址设置,地址为空就抛异常
validateNameServerSetting();
//抛异常
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
DefaultMQProducerImpl#sendKernelImpl
//计算开始时间
long beginStartTime = System.currentTimeMillis();
//根据Broker名称获取Broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//若Broker地址为空
if (null == brokerAddr) {
//那就根据Topic重新检索一下Broker
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
//Vip通道Broker
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
//设置唯一ID
MessageClientIDSetter.setUniqID(msg);
}
boolean topicWithNamespace = false;
//设置nameSpace
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
//压缩msg
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
//系统标识
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
../部分代码省略
//组装消息头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
//批量标识
requestHeader.setBatch(msg instanceof MessageBatch);
//判断Topic是否重试
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
//异步
case ASYNC:
../部分代码省略
//计算耗时
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
//计算耗时
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
//发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
../部分代码省略
MQClientAPIImpl#sendMessage
../部分代码省略
switch (communicationMode) {
case ONEWAY:
//单向发送
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
return null;
case ASYNC:
final AtomicInteger times = new AtomicInteger();
//计算耗时
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
//异步发送
this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, context, producer);
return null;
case SYNC:
//计算耗时
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTimeSync) {
throw new RemotingTooMuchRequestException("sendMessage call timeout");
}
//同步发送
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
default:
assert false;
break;
}
return null;
MQClientAPIImpl#sendMessageSync
//远程调用同步请求
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
//返回结果
return this.processSendResponse(brokerName, msg, response,addr);
MQClientAPIImpl#invokeSync
//计算开始时间
long beginStartTime = System.currentTimeMillis();
//根据Broker的地址创建通道
final Channel channel = this.getAndCreateChannel(addr);
//判断通道不为空且存活
if (channel != null && channel.isActive()) {
try {
//执行before钩子
doBeforeRpcHooks(addr, request);
//计算耗时
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout");
}
//调用同步实现,返回结果
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
//执行after钩子
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
//超时关闭通道
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
//关闭通道,抛异常
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
MQClientAPIImpl#invokeSyncImpl
//请求ID,producer唯一
final int opaque = request.getOpaque();
try {
//根据请求ID,创建返回future
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
//放入返回集合
this.responseTable.put(opaque, responseFuture);
//获取Socket服务器地址,也就是Broker地址
final SocketAddress addr = channel.remoteAddress();
//写入请求消息,并刷新
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
} else {
responseFuture.setSendRequestOK(false);
}
responseTable.remove(opaque);
responseFuture.setCause(f.cause());
responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed.");
}
});
//这里利用的是Netty的机制,通过CountDownLatch来实现
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
responseFuture.getCause());
} else {
throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
}
}
return responseCommand;
} finally {
//删除该结果,避免集合越来越庞大
this.responseTable.remove(opaque);
}
2):同步消息发送逻辑总结
- 消息长度验证
- 查找主题路由信息,若没有从NameServer中更新
- 设置重试次数,并选择Broker的消息队列。若出现异常情况,切换Broker进行重试
- 发送消息(检查钩子,压缩消息,组装头部信息)
- 判断发送方式,调用Netty的Socket通道进行发送消息请求
- 获取结果指令,返回