1. 消息发送介绍
RocketMQ提供三种消息发送方式,分别是同步(Syn)、异步(Asyn)和单向(one way)
- 同步:生产者发送消息时,同步等待消息服务器发送结果
- 异步:生产者发送消息时,指定发送成功后的回调函数,生产者调用消息发送接口时立即返回结果,消息发送线程不阻塞。消息发送成功或者失败时回调函数通过一个新的线程来执行
单向:生产者发送消息时直接返回,不等待消息服务器的结果,也不注册回调函数。简单概括为只管消息发送,不管消息发送结果
1.1.Topic 路由机制
我们所说的消息发送,其实就是将消息发送至某个topic,在初次发送时会根据topic的名称向Nameserver 集群查询topic的路由信息,然后将路由信息存储在生产者本地缓存中,然后每隔30s遍历本地缓存中的topic,向Nameserver 查询最新的路由信息。如果成功查到路由信息,会将这些信息更新至本地缓存,实现topic 路由信息的动态感知
RocketMQ 还提供自动创建主题(Topic) 机制,如果topic不存在,并且开启了自动创建主题机制配置,那么会使用默认的路由信息进行路由1.2. 消息发送高可用
生产者在自动发现主题的路由信息后,RocketMQ 默认使用轮询算法进行路由的负载均衡。RocketMQ 在消息发送时也支持自定义队列负载均衡,但是使用自定义路由负债时重试机制会失效
RocketMQ 引入两个重要的特点来保证消息的高可用消息发送重试机制
RocketMQ 在消息发送时如果出现失败,默认会重试两次
- 故障规避机制
当消息第一次发送失败时,如果下一次消息还是发送到刚刚失败的Broker上,其消息发送失败的概率会很大,为了保证重试的可靠性,在重试时会尽量避开刚刚接收失败的Broker,而是选择其他Broker上的队列进行发送来提高消息发送的成功率
2. 生产者启动
消息生产者代码都在client 模块中,对于RocketMQ来说它既是消息客户端,也是消息生产者。
2.1. 时序图
2.2. 源码分析
生产者启动入口在org.apache.rocketmq.client.producer.DefaultMQProducer.start() 方法
@Override
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
// 调用DefaultMQProducerImpl 对象的start()方法启动生产者
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
进入DefaultMQProducerImpl 类的start()方法,启动主要分为四个部分
- 检查producerGroup 是否符合要求
- 创建MQClientInstance 实例
- 向MQClientInstance 注册服务
启动MQClientInstance ```java public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// Step1: 检查producerGroup 是否符合要求,改变生产者的instanceName为进程ID
this.checkConfig();
// 如果生产者的实例名称为默认DEFAULT,那么将实例名称替换为进程ID
// 这么做的目的是解决在同一个物理机上部署多个生产者应用导致clientId 混乱(重复)问题
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
// Step2:创建MQClientInstance 实例
// 在一个JVM中,MQClientInstance 实例只存在一个
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
// Step3:向MQClientInstance 注册服务,将当前生产者加入MQClientInstance 管理,便于后续调用网络请求、心跳检测等
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 默认TopicKey
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
// Step4: 启动MQClientInstance,如果MQClientInstanec 已经启动,本次启动不会去执行
if (startFactory) {
mQClientFactory.start();
}
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
RequestFutureHolder.getInstance().startScheduledTask(this);
}
<a name="mBl2v"></a>
### 2.2.1. 检查producerGroup
检查producerGroup 是否符合要求,改变生产者的instanceName为进程ID
```java
private void checkConfig() throws MQClientException {
// 检查producerGroup
Validators.checkGroup(this.defaultMQProducer.getProducerGroup());
if (null == this.defaultMQProducer.getProducerGroup()) {
throw new MQClientException("producerGroup is null", null);
}
if (this.defaultMQProducer.getProducerGroup().equals(MixAll.DEFAULT_PRODUCER_GROUP)) {
throw new MQClientException("producerGroup can not equal " + MixAll.DEFAULT_PRODUCER_GROUP + ", please specify another one.",
null);
}
}
producerGroup 检查最终交给Validators.checkGroup()方法校验,主要对producerGroup做了非空和长度校验
public static void checkGroup(String group) throws MQClientException {
// 非空校验
if (UtilAll.isBlank(group)) {
throw new MQClientException("the specified group is blank", null);
}
// 长度校验
if (group.length() > CHARACTER_MAX_LENGTH) {
throw new MQClientException("the specified group is longer than group max length 255.", null);
}
if (isTopicOrGroupIllegal(group)) {
throw new MQClientException(String.format(
"the specified group[%s] contains illegal characters, allowing only %s", group,
"^[%|a-zA-Z0-9_-]+$"), null);
}
}
如果生产者的实例名称为默认DEFAULT,那么将实例名称替换为进程ID,这么做的目的是解决在同一个物理机上部署多个生产者应用导致clientId 混乱(重复)问题
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
this.defaultMQProducer.changeInstanceNameToPID();
}
替换实例名称为进程ID
public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}
2.2.2. 创建MQClientInstance实例
MQClientInstance 实例的创建在MQClientManager.getOrCreateMQClientInstance()方法中创建
在一个JVM中,RocketMQ只允许一个MQClientInstance 实例存在。在MQClientManager类中维护一个缓存表Map结构,名称为factoryTable ,key=clientId,value=MQClientInstance。
public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
// 创建clientId
String clientId = clientConfig.buildMQClientId();
// 根据clientId从本地缓存factoryTable中获取MQClientInstance
// 确保一个clientId 只会创建一个实例
MQClientInstance instance = this.factoryTable.get(clientId);
// 为空说明还没有创建MQClientInstance 实例
if (null == instance) {
// 创建MqClientInstance 实例
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
// 将MQClientInstance 实例加入factoryTable 缓存中
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
}
clientid 的生成规则,ip+@+生产者实例名称+unitname(可选)。
注意:如果在同一个物理机上部署两个生产者应用程序,clientId 相同,为了避免造成这样的混乱
解决方案如果实例名称(instance)为默认值DEFAULT,那么会将instance设置为进程ID来避免不同进程相互影响的问题。
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
2.2.3. 向MQClientInstance 注册服务
创建号MQClientInstance 实例后,接下来就是将当前生产者加入MQClientInstance 来管理,方便后续调用网络请求、心跳检测等
public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {
if (null == group || null == producer) {
return false;
}
// 将生产者对象加入producerTable缓存表中(Map结构),key=生产者名称,value=生产者实例
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
}
2.2.4. 启动MQClientInstance
public void start() throws MQClientException {
synchronized (this) {
// 只有serviceState没有启动时才创建,否则不会创建
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
2.2.5. 生产者状态
消息生产者一共有四个状态,分别是创建、运行、关闭、失败四个状态,默认为创建状态,在调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.start()方法启动时,状态首先改变为START_FAILED状态,当生产者启动完成之后状态改变为RUNNING,在调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.shutdown()方法时,状态改变为SHUTDOWN_ALREADY 状态
public enum ServiceState {
/**
* Service just created,not start
*/
CREATE_JUST,
/**
* Service Running
*/
RUNNING,
/**
* Service shutdown
*/
SHUTDOWN_ALREADY,
/**
* Service Start failure
*/
START_FAILED;
}
3. 消息发送流程
创建完DefaultMQProducer对象后,接着我们就可以调用send()方法来发送消息,在MQProducer接口中提供了很多send()方法来发送消息
lt send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback) throws MQClientException, RemotingException,
InterruptedException;
void send(final Message msg, final MessageQueueSelector selector, final Object arg,
final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
InterruptedException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
SendResult send(final Collection<Message> msgs, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void send(final Collection<Message> msgs, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException;
void send(final Collection<Message> msgs, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//for rpc
Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final RequestCallback requestCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
InterruptedException;
void request(final Message msg, final MessageQueueSelector selector, final Object arg,
final RequestCallback requestCallback,
final long timeout) throws MQClientException, RemotingException,
InterruptedException, MQBrokerException;
Message request(final Message msg, final MessageQueue mq, final long timeout)
throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
3.1. 时序图
3.2. 消息发送入口
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl()方法作为消息发送的入口,生产者不管调用哪个消息发送方法,最终都会执行到sendDefaultImpl()方法,所以我们叫sendDefaultImpl()为消息发送入口方法
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 生产者状态验证,状态一定要是RUNING状态
this.makeSureStateOK();
// Step1: 消息长度验证,要求主题名称、消息提不能为空。
// 消息长度要大于0且不能超过允许发送的最大长度1024 * 1024 * 4 (4MB)
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// Step2: 查找主题的路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// Step3: 主题信息没有问题,继续下一步选择队列
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 最大重试次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
// 重试计数器
int times = 0;
String[] brokersSent = new String[timesTotal];
// 循环,实现消息重试,最大重试次数不能超过timesTotal
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择消息队列,轮询策略,通过生成一个随机数,然后递增和消息队列长度取模,根据取模位置返回消息队列
// 消息队列选择有两种方式,通过sendLatencyFaultEnable 属性设置:
// 1. 启用Broker 故障延迟机制
// 2. 不启用Broker 故障延迟机制
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
// 记录每一次发送消息的broker名称
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
if (timeout < costTime) {
callTimeout = true;
break;
}
// Step4: 调用消息发送核心方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
通过分析消息发送入口方法,消息发送大致分为4个步骤
- 消息长度验证
- 查找主题路由信息
- 选择消息队列
- 调用消息发送核心方法
3.2. 消息长度验证
消息的合法性进行检查的逻辑很简单,主要要求主题名称、消息提不能为空,消息长度要大于0且不能超过允许发送的最大长度1024 1024 4 (4MB)
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
// 消息主题的长度和非空验证
Validators.checkTopic(msg.getTopic());
Validators.isNotAllowedSendTopic(msg.getTopic());
// 消息非空验证
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
// 消息最小长度验证,长度一定要大于0
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
// 消息最大长度验证
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
3.2. 查找主题路由信息
3.2.1. 时序图
3.2.2. 源码分析
在消息发送之前,需要获取主题的路由信息,只有获得路由信息我们才知道消息具体要发送到哪个Broker节点上,如果生产者中缓存了Topic的路由信息,且该路由信息包含消息队列,则直接返回该路由信息。如果缓存或者没有包含消息队列,就从Nameserver查询该topic的路由信息,如果最终未找到路由信息,就抛出无法找到主题的异常
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从生产者本地缓存获取topic的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果本地缓存,向Nameserver查询该topic的路由信息
if (null == topicPublishInfo ,|| !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从Nameserver 查询topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 是否查询到Topic的路由信息校验, 查询到直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 没有查询到Topic的路由信息,查询默认主题的路由信息
// 注意:只有autoCreateTopicEnable=true时,Topic没有路由信息时才会使用默认路由,否则抛出异常
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
主题路由信息描述在TopicPublishInfo类中,这个类的属性是用来描述主题的路由信息
public class TopicPublishInfo {
/**
* 是否顺序消息
*/
private boolean orderTopic = false;
/**
* 是否有主题路由信息标识
*/
private boolean haveTopicRouterInfo = false;
/**
* 当前主题的消息队列
*/
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
* 每选择依次消息队列,该值会自增1,如果超过Integer.MAX_VALUE,重置为0,用于选择消息队列
*/
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/**
* Topic 路由信息
*/
private TopicRouteData topicRouteData;
public boolean isOrderTopic() {
return orderTopic;
}
public void setOrderTopic(boolean orderTopic) {
this.orderTopic = orderTopic;
}
public boolean ok() {
return null != this.messageQueueList && !this.messageQueueList.isEmpty();
}
public List<MessageQueue> getMessageQueueList() {
return messageQueueList;
}
public void setMessageQueueList(List<MessageQueue> messageQueueList) {
this.messageQueueList = messageQueueList;
}
public ThreadLocalIndex getSendWhichQueue() {
return sendWhichQueue;
}
public void setSendWhichQueue(ThreadLocalIndex sendWhichQueue) {
this.sendWhichQueue = sendWhichQueue;
}
public boolean isHaveTopicRouterInfo() {
return haveTopicRouterInfo;
}
public void setHaveTopicRouterInfo(boolean haveTopicRouterInfo) {
this.haveTopicRouterInfo = haveTopicRouterInfo;
}
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
public int getQueueIdByBroker(final String brokerName) {
for (int i = 0; i < topicRouteData.getQueueDatas().size(); i++) {
final QueueData queueData = this.topicRouteData.getQueueDatas().get(i);
if (queueData.getBrokerName().equals(brokerName)) {
return queueData.getWriteQueueNums();
}
}
return -1;
}
@Override
public String toString() {
return "TopicPublishInfo [orderTopic=" + orderTopic + ", messageQueueList=" + messageQueueList
+ ", sendWhichQueue=" + sendWhichQueue + ", haveTopicRouterInfo=" + haveTopicRouterInfo + "]";
}
public TopicRouteData getTopicRouteData() {
return topicRouteData;
}
public void setTopicRouteData(final TopicRouteData topicRouteData) {
this.topicRouteData = topicRouteData;
}
}
Topic 元数据信息TopicRouteData 的属性字段描述
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
/**
* topic 的队列信息
*/
private List<QueueData> queueDatas;
/**
* topic 所在的Broker 信息
*/
private List<BrokerData> brokerDatas;
/**
* broker 上过滤服务器的地址列表
*/
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
当根据topic 名称从生产者本地获取不到主题路由信息时,从Nameserver 尝试获取路由信息,然后对比Nameserver中获取的路由信息和本地缓存表中的路由信息,如果不一致更新本地缓存表中的路由信息,通过updateTopicRouteInfoFromNameServer()获取路由信息
/**
* 更新消息生产者和维护路由缓存
*
* @param topic 主题名称
* @param isDefault 是否查询默认主题路由信息
* @param defaultMQProducer 生产者实例
* @return
*/
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
// 上锁
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
// 处理默认主题的路由信息
if (isDefault && defaultMQProducer != null) {
// 查询默认主题路由信息,默认3秒超时
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
clientConfig.getMqClientApiTimeout());
// 如果查询到默认路由信息
// 将路由信息中读写队列的个数替换为生产者默认队列个数
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
// 默认队列数量和Nameserver中队列数量,取最小数量
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
// 查询自定义主题路由信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
}
// 非空表示找到Topic的路由信息,与本地缓存中的路由信息进行对比
// 判断路由信息是否发生了改变,如果未发生变化直接返回false
if (topicRouteData != null) {
// 本地缓存中取路由信息表
TopicRouteData old = this.topicRouteTable.get(topic);
// 对比Topic本地缓存和Nameserver 中的路由信息表
boolean changed = topicRouteDataIsChange(old, topicRouteData);
// 本地缓存和Nameserver中路由信息不一致,更新
if (!changed) {
changed = this.isNeedUpdateTopicRouteInfo(topic);
} else {
log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
}
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
// Update Pub info
if (!producerTable.isEmpty()) {
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
// 有主题路由标识
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
// Update sub info
if (!consumerTable.isEmpty()) {
Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
} else {
log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
}
} catch (MQClientException e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
} catch (RemotingException e) {
log.error("updateTopicRouteInfoFromNameServer Exception", e);
throw new IllegalStateException(e);
} finally {
this.lockNamesrv.unlock();
}
} else {
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
}
} catch (InterruptedException e) {
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
}
return false;
}
3.3. 选择消息队列
提供两种消息队列选择机制,默认机制为不启用Broker 故障延迟机制,还有一种启用故障延迟机制。
通过sendLatencyFaultEnable 参数设置,是boolean值, false 为默认机制,true 为Broker 故障延迟机制
注意:如果sendLatencyFaultEnable 设置为false也能规避Broker,它们的区别在于如果sendLatencyFaultEnable设置为true,当消息发送消息失败时,就会悲观的认为Broker不可用,那么在接下来的一段时间内不在向其发送消息,直接略过该Broker。sendLatencyFaultEnable设置为true时,就只会在本次消息发送的重试过程中略过Broker,下一次消息发送还是会继续尝试使用该Broker
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 是否启用Broker 故障延迟机制,默认不启用,可以通过DefaultMQProducerImpl对象设置
// Broker 故障延迟机制
if (this.sendLatencyFaultEnable) {
try {
// Step1: 轮询获取一个消息队列
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 尝试从避规的Broker中选择一个可用的Broker,如果没有找到返回null
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 一个主题的队列写数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 没有启动Broker 故障延迟机制选择消息队列
// 默认机制
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
3.3.1. 默认机制
消息队列选择策略在当org.apache.rocketmq.client.latency.MQFaultStrategy.selectOneMessageQueue()中,如果sendLatencyFaultEnable=false 使用默认消息选择策略,调用TopicPublishInfo.selectOneMessageQueue()方法选择消息队列,默认消息队列选择采用轮询策略
这种机制的特点在于只针对当前发送的消息,在重试时规避掉不可用的Broker
/**
* 默认消息队列选择机制(默认机制)
* 没有开启Broker 故障延迟机制
* @param lastBrokerName 上一次选择执行发送失败的Broker
* @return
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 消息发送第一次选择Queue
if (lastBrokerName == null) {、
// 第一次选择QUeue时,直接用自增(sendWhichQueue)变量和消息路由表中的队列数量取模
// 根据取模下标获取队列
return selectOneMessageQueue();
} else {
// 消息发送不是第一次选择Queue,此时说明至少有一次消息发送失败了,发生重试,
// 此时lastBrokerName 为上一次发送消息选择的Broker名称
for (int i = 0; i < this.messageQueueList.size(); i++) {
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 校验当前发送消息选择的Broker名称不能上一次发送消息选择的Broker名称是否相同
// 对于相同的消息队列,不做为本次消息队列选择的Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
敲重点:为什么选择消息队列的时候会有一个lastBrokerName参数,首先说一下这个参数是上一次选择执行发送消息失败的Broker名称,第一次执行消息队列选择时,lastBrokerName 为Null,当消息发送失败重试时,lastBrokerName为上一次选择的队列所属的Broker名称。这么做的目的是避免重试消息发送时还是选择相同的Broker进行发送,如果重试选择的Broker和上一次发送选择的Broker是一样,发生失败的概率会很高。lastBrokerName 变化图
3.3.2. Broker 故障延迟机制
sendLatencyFaultEnable 设置为true时,使用故障延迟机制来规避Broker的选择,故障延迟机制的特点在于当前消息发送失败时,会将发送的Broker在一段时间规避,也就时说在一段时间内下一个消息发送时,该Broker不能被选择
/**
* 选择消息队列
* 提供两种消息队列选择机制,默认机制、Broker 故障延迟机制
*
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 是否启用Broker 故障延迟机制,默认不启用,可以通过DefaultMQProducerImpl对象设置
if (this.sendLatencyFaultEnable) {
try {
// Step1: 轮询获取一个消息队列
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证消息队列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 尝试从避规的Broker中选择一个可用的Broker,如果没有找到返回null
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
// 一个主题的队列写数量
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
// 没有启动Broker故障延迟机制选择消息队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
当消息发送失败时,会调用org.apache.rocketmq.client.latency.updateFaultItem()方法来更新当前Broker 不可用的时间(规避时间)
/**
* 消息发送异常时,更新失败Broker条目的延迟信息
*
* @param brokerName broker名称
* @param currentLatency 本次消息发送的延迟时间
* @param isolation 是否规避Broker {true:规避,false:不规避}
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
// 校验是否启用Broker故障延迟机制
if (this.sendLatencyFaultEnable) {
// 如果规避Broker,使用默认30s作为Broker故障规避时间,否则使用消息发送延迟时间来计算Broker故障规避时间
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 更新Broker 规避条目
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
计算Broker规避时长,在这个时间段内Broker将不在参与消息发送队列的负载。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
// 规避时长
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
/**
* 计算本次消息发送故障需要规避Broker的时长,也就是接下来多长时间内Broker将不参与消息发送队列负债
*
* @param currentLatency
* @return
*/
private long computeNotAvailableDuration(final long currentLatency) {
// 从latencyMax尾部开始寻找,找到第一个比currentLatency小的下标
// 然后从notAvailableDuration数组中获取需要规避的时长
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
3.4. 消息发送核心
3.4.1. 时序图
3.4.2. 源码分析
在选择消息发送队列后,接下来执行消息发送核心方法,在org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl()
/**
* 消息发送
* @param msg 待发送消息
* @param mq 消息发送的消息队列
* @param communicationMode 消息发送模式,SYNC\ASYNC\ONEWAY
* @param sendCallback 异步消息回调函数
* @param topicPublishInfo 主题路由信息
* @param timeout 消息发送超时时间
* @return
* @throws MQClientException
* @throws RemotingException
* @throws MQBrokerException
* @throws InterruptedException
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
// Step1:根据Broker名称获取Broker的网络地址
// 如果本地MQClientInstance实例中brokerAddrTable 为缓存Broker信息,从Nameserver主动更新topic的路由信息
// 如果路由信息更新后还是拿不到Broker信息就抛出异常(MQClientException)提示Broker不存在
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
// 定义消息发送对象
SendMessageContext context = null;
if (brokerAddr != null) {
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
// 消息内容原始数据
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
// Step2: 为消息分配全局唯一ID
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
// 如果有命名空间,使用命名空间做为消息实例ID
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
int sysFlag = 0;
boolean msgBodyCompressed = false;
// 对消息体采用zip压缩,默认大于4k的消息体会进行压缩
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
// 是否为消息系统标记
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// Step3: 如果注册了消息发送钩子函数,则执行消息发送之前的钩子函数(前置执行)
// 钩子函数通过DefaultMQProducerImpl.registerSendMessageHook()方法注册,钩子函数需要实现SendMessageHook接口
// 可以注册多个,MQ默认为我们注册了SendMessageTraceHookImpl 钩子函数
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
// 执行钩子函数,既执行实现了SendMessageHook 接口的类,并且注册到DefaultMQProducerImpl.sendMessageHookList
this.executeSendMessageHookBefore(context);
}
// Step4: 构建消息发送体,消息体信息主要包括:
// 生产者组、主题名称、默认创建主题key、该主题在单个Broker上的默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、
// 消息发送时间、消息标记(RocketMQ对消息中的标记不做任何处理,仅提供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 生产者组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 主题名称
requestHeader.setTopic(msg.getTopic());
// 默认创建主题key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 该主题在单个Broker上的默认队列数
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 队列ID(队列序号)
requestHeader.setQueueId(mq.getQueueId());
// 消息系统标记(MessageSysFlag)
requestHeader.setSysFlag(sysFlag);
// 消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息标记(RocketMQ对消息中的标记不做任何处理,仅提供应用程序使用)
requestHeader.setFlag(msg.getFlag());
// 消息扩展属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否是批量消息
requestHeader.setBatch(msg instanceof MessageBatch);
// 处理重试主题
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
// 获取重试次数
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
// 消息重试次数
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
// 清除Message待发送消息重试次数
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
// 最大重试次数
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
// 设置最大重试次数
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
// 清除Message待发送消息最大重试次数
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
SendResult sendResult = null;
switch (communicationMode) {
// 异步消息,异步消息发送支持重试
// 注意异步发送仅在服务端收到消息发送响应请求时才会重试,如果出现网络异常、超时等情况不会重试
case ASYNC:
Message tmpMessage = msg;
boolean messageCloned = false;
// 如果消息内容被压缩,msg对象中的消息内容重置回原始(压缩之前)内容
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
msg.setBody(prevBody);
}
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
// 消息发送超时校验
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 异步发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
// 单项发送
// 同步消息,同步发送消息不支持重试
case SYNC:
// 消息发送超时校验
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
// 同步发送消息
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// Step6: 执行钩子成函数(后置执行)
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
// 执行钩子成函数(后置执行)
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
// 执行钩子成函数(后置执行)
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
// 执行钩子成函数(后置执行)
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
3.4.2.1. 获取Broker的master节点网络地址
首先从本地缓存表中获取Broker的master节点的网络地址
/**
* 获取Broker的master节点网络地址
* @param brokerName
* @return
*/
public String findBrokerAddressInPublish(final String brokerName) {
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
}
return null;
}
如果本地缓存表中没有,从Nameserver获取Broker的路由信息,具体逻辑看3.2章节查找主题路由信息
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 从生产者本地缓存获取topic的路由信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 如果本地缓存,向Nameserver查询该topic的路由信息
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 从Nameserver 查询topic的路由信息
// Nameserver和本地缓存的路由信息对比,如果不一致,替换本地缓存表中的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
// 是否查询到Topic的路由信息校验, 查询到直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
// 没有查询到Topic的路由信息,从Nameserver查询默认主题的路由信息
// 注意:只有autoCreateTopicEnable=true时,Topic没有路由信息时才会使用默认路由,否则抛出异常
// Nameserver默认路由信息和本地缓存的默认路由信息对比,如果不一致,替换本地缓存表中的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
3.4.2.2. 为消息分配唯一ID
public static void setUniqID(final Message msg) {
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}
消息id生成规则
public static String createUniqID() {
char[] sb = new char[LEN * 2];
System.arraycopy(FIX_STRING, 0, sb, 0, FIX_STRING.length);
long current = System.currentTimeMillis();
if (current >= nextStartTime) {
setStartTime(current);
}
int diff = (int)(current - startTime);
if (diff < 0 && diff > -1000_000) {
// may cause by NTP
diff = 0;
}
int pos = FIX_STRING.length;
UtilAll.writeInt(sb, pos, diff);
pos += 8;
UtilAll.writeShort(sb, pos, COUNTER.getAndIncrement());
return new String(sb);
}
3.4.2.3. 消息发送之前的钩子函数
在消息发送之前,如果有注册钩子函数,先执行消息发送之前的钩子函数。
钩子函数通过DefaultMQProducerImpl.registerSendMessageHook()方法注册,钩子函数需要实现SendMessageHook接口, 可以注册多个,MQ默认为我们注册了SendMessageTraceHookImpl 钩子函数
public void executeSendMessageHookBefore(final SendMessageContext context) {
if (!this.sendMessageHookList.isEmpty()) {
for (SendMessageHook hook : this.sendMessageHookList) {
try {
hook.sendMessageBefore(context);
} catch (Throwable e) {
log.warn("failed to executeSendMessageHookBefore", e);
}
}
}
}
3.4.2.4. 消息发送
首先构建消息发送体,消息体内容包括:生产者组、主题名称、默认创建主题key、该主题在单个Broker上的默认队列数、队列ID(队列序号)、消息系统标记(MessageSysFlag)、
消息发送时间、消息标记(RocketMQ对消息中的标记不做任何处理,仅提供应用程序使用)、消息扩展属性、消息重试次数、是否是批量消息
然后根据消息发送方式向Broker 发送消息,消息发送方式分为同步、异步、单项。
同步和单项消息发送有返回结果。异步消息发送没有返回结果,通过回调函处理消息发送结果
- 消息同步发送
消息同步发送方法org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync()
private SendResult sendMessageSync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request
) throws RemotingException, MQBrokerException, InterruptedException {
// Netty 同步发送消息
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
// 处理消息发送结果
return this.processSendResponse(brokerName, msg, response, addr);
}
同步发送消息
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
try {
// 消息发送前置处理
doBeforeRpcHooks(addr, request);
long costTime = System.currentTimeMillis() - beginStartTime;
// 超时校验
if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");
}
// 通过Netty调用Broker 发送消息
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
// 消息发送后置处理
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
return response;
} catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
// 关闭通道
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
}
} else {
// 关闭通道
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
}
处理同步发送消息结果
private SendResult processSendResponse(
final String brokerName,
final Message msg,
final RemotingCommand response,
final String addr
) throws MQBrokerException, RemotingCommandException {
SendStatus sendStatus;
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT: {
sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
break;
}
case ResponseCode.FLUSH_SLAVE_TIMEOUT: {
sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
break;
}
case ResponseCode.SLAVE_NOT_AVAILABLE: {
sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
break;
}
case ResponseCode.SUCCESS: {
sendStatus = SendStatus.SEND_OK;
break;
}
default: {
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
}
// 构建同步发送消息的响应结果对象
SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
//If namespace not null , reset Topic without namespace.
String topic = msg.getTopic();
if (StringUtils.isNotEmpty(this.clientConfig.getNamespace())) {
topic = NamespaceUtil.withoutNamespace(topic, this.clientConfig.getNamespace());
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, responseHeader.getQueueId());
String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
if (msg instanceof MessageBatch) {
StringBuilder sb = new StringBuilder();
for (Message message : (MessageBatch) msg) {
sb.append(sb.length() == 0 ? "" : ",").append(MessageClientIDSetter.getUniqID(message));
}
uniqMsgId = sb.toString();
}
// 同步发送消息结果
SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset());
sendResult.setTransactionId(responseHeader.getTransactionId());
String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
if (regionId == null || regionId.isEmpty()) {
regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
if (traceOn != null && traceOn.equals("false")) {
sendResult.setTraceOn(false);
} else {
sendResult.setTraceOn(true);
}
sendResult.setRegionId(regionId);
return sendResult;
}
异步发送
private void sendMessageAsync(
final String addr,
final String brokerName,
final Message msg,
final long timeoutMillis,
final RemotingCommand request,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final MQClientInstance instance,
final int retryTimesWhenSendFailed,
final AtomicInteger times,
final SendMessageContext context,
final DefaultMQProducerImpl producer
) throws InterruptedException, RemotingException {
final long beginStartTime = System.currentTimeMillis();
try {
// Netty 请求发送消息,接收该请求的类是org.apache.rocketmq.broker.processor.SendMessageProcessor
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
long cost = System.currentTimeMillis() - beginStartTime;
RemotingCommand response = responseFuture.getResponseCommand();
if (null == sendCallback && response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
if (context != null && sendResult != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
return;
}
if (response != null) {
try {
SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response, addr);
assert sendResult != null;
if (context != null) {
context.setSendResult(sendResult);
context.getProducer().executeSendMessageHookAfter(context);
}
try {
sendCallback.onSuccess(sendResult);
} catch (Throwable e) {
}
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
} catch (Exception e) {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, e, context, false, producer);
}
} else {
producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
if (!responseFuture.isSendRequestOK()) {
MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else if (responseFuture.isTimeout()) {
MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms",
responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
} else {
MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
}
});
} catch (Exception ex) {
long cost = System.currentTimeMillis() - beginStartTime;
producer.updateFaultItem(brokerName, cost, true);
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
retryTimesWhenSendFailed, times, ex, context, true, producer);
}
}
3.4.2.5. 消息接收
生产者将消息发送至Broker后,Netty的code是RequestCode._SEND_MESSAGE,在Broker中通过_org.apache.rocketmq.broker.processor.SendMessageProcessor.processRequest()方法接收消息发送请求
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
// 处理消息发送请求
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
消息接收核心入口方法
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// 解析消息发送对象,转换为SendMessageRequestHeader 对象
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
// 构建消息对象
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 执行钩子函数(前置处理)
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
// 批量消息异步发送
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 单个消息异步发送
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}