- Producer启动
- 1):Producer启动代码执行
- DefaultMQProducerImpl#start
- MQClientManager#getOrCreateMQClientInstance
- MQClientInstance#registerProducer
- MQClientInstance#start
- MQClientAPIImpl#start
- MQClientInstance#startScheduledTask
- MQClientInstance#sendHeartbeatToAllBrokerWithLock
- MQClientInstance#sendHeartbeatToAllBroker
- DefaultMQProducerImpl#startScheduledTask
- RequestFutureTable#scanExpiredRequest
- 2):Producer启动逻辑总结
- 1):Producer启动代码执行
Producer启动
1):Producer启动代码执行
首先利用DefaultMQProducer实现类进行启动。
//设置生产组
this.setProducerGroup(withNamespace(this.producerGroup));
//生产者实现启动
this.defaultMQProducerImpl.start();
//判断消息追踪器是否为空
if (null != traceDispatcher) {
try {
//消息追踪器启动
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
DefaultMQProducerImpl#start
//serviceState默认为CREATE_JUST(刚创建)
switch (this.serviceState) {
case CREATE_JUST:
//设置服务状态为启动失败
this.serviceState = ServiceState.START_FAILED;
//检查配置
this.checkConfig();
//判断生产组是否符合要求
if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
//更改InstanceName为进程ID
this.defaultMQProducer.changeInstanceNameToPID();
}
//获取或创建MQ客户端实例
this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
//注册生产者
boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
if (!registerOK) {
//注册失败抛异常
this.serviceState = ServiceState.CREATE_JUST;
throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
//增加Topic发布信息
this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
//启动client属性,默认true
if (startFactory) {
//MQ客户端实例启动
mQClientFactory.start();
}
//打印生产者启动成功
log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
//将服务状态改为运行中
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The producer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
}
//发送心跳给所有Broker
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
//启动定时Task
this.startScheduledTask();
MQClientManager#getOrCreateMQClientInstance
//clientId = 客户端IP@DEFAULT
String clientId = clientConfig.buildMQClientId();
//根据clientId获取MQ客户端实例
MQClientInstance instance = this.factoryTable.get(clientId);
if (null == instance) {
//创建MQ客户端实例
instance =
new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
//判断是否已创建,已创建则用已有实例
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
return instance;
MQClientInstance#registerProducer
if (null == group || null == producer) {
return false;
}
//放入生产者实例集合
MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
if (prev != null) {
log.warn("the producer group[{}] exist already.", group);
return false;
}
return true;
MQClientInstance#start
//加上同步锁,锁对象为MQClientInstance,一个jvm只有一个实例
synchronized (this) {
//switch状态
switch (this.serviceState) {
case CREATE_JUST:
//设置当前状态为失败,避免代码执行过程中异常退出
this.serviceState = ServiceState.START_FAILED;
// 如果当前配置中的NameServer地址为空,则检索NameServer地址
if (null == this.clientConfig.getNamesrvAddr()) {
//检索NameServer地址
this.mQClientAPIImpl.fetchNameServerAddr();
}
// 启动请求-接收通道
this.mQClientAPIImpl.start();
// 启动定时任务Task
this.startScheduledTask();
// 启动拉取消息服务
this.pullMessageService.start();
// 启动负载均衡服务
this.rebalanceService.start();
// 启动推送服务
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
//设置当前状态为运行中
this.serviceState = ServiceState.RUNNING;
break;
case START_FAILED:
//启动失败,抛出异常
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
MQClientAPIImpl#start
//远程客户端启动
this.remotingClient.start();
MQClientInstance#startScheduledTask
//若配置NameServer地址为空,开启检索NameServer地址,10后执行,120s执行一次
if (null == this.clientConfig.getNamesrvAddr()) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
}
//从NameServer中更新Topic路由信息,默认30s执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
//心跳续约,1s后执行,默认30s执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//清除下线Broker信息
MQClientInstance.this.cleanOfflineBroker();
//发送心跳信息给所有Broker
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
//更新消费进度,10s后执行,每隔5s执行一次(逻辑暂不确定,回头再改)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
//调整线程池,1秒执行一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
MQClientInstance#sendHeartbeatToAllBrokerWithLock
//重入锁加锁
if (this.lockHeartbeat.tryLock()) {
try {
//发送心跳给所有Broker
this.sendHeartbeatToAllBroker();
//上传过滤资源(暂时还没搞清楚用来做什么)
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
//解锁
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed. [{}]", this.clientId);
}
MQClientInstance#sendHeartbeatToAllBroker
//组装心跳数据,若是producer,那就是producer心跳数据。若是consumer,那就是consumer心跳数据
final HeartbeatData heartbeatData = this.prepareHeartbeatData();
final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
//producer跟consumer集合都为空,即返回
if (producerEmpty && consumerEmpty) {
log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
return;
}
//若Broker地址列表不为空
if (!this.brokerAddrTable.isEmpty()) {
//获取心跳次数
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, HashMap<Long, String>> entry = it.next();
String brokerName = entry.getKey();
HashMap<Long, String> oneTable = entry.getValue();
if (oneTable != null) {
for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
Long id = entry1.getKey();
String addr = entry1.getValue();
if (addr != null) {
if (consumerEmpty) {
if (id != MixAll.MASTER_ID)
continue;
}
try {
//发送心跳给Broker
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
} else {
log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
id, addr, e);
}
}
}
}
}
}
}
DefaultMQProducerImpl#startScheduledTask
//若生产者数量从0变成1时,执行扫描超时请求,3s后执行,1s一次
if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
}
RequestFutureTable#scanExpiredRequest
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, RequestResponseFuture> next = it.next();
RequestResponseFuture rep = next.getValue();
//删除超时Request连接
if (rep.isTimeout()) {
it.remove();
rfList.add(rep);
log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
}
}
//轮询超时request,超时request报错
for (RequestResponseFuture rf : rfList) {
try {
Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
rf.setCause(cause);
rf.executeRequestCallback();
} catch (Throwable e) {
log.warn("scanResponseTable, operationComplete Exception", e);
}
}
2):Producer启动逻辑总结
- 默认Producer实例启动。
- 检查producer配置,主要时producer生产者名称。若producerGroup不是默认名称,进行更改名称。
- 创建或获取MQ客户端实例(一个JVM只有一个)
- 在本地根据producerGroup注册producer
- 生成topic订阅信息
- 启动MQ客户端实例
- 若NameServer地址为空,检索NameServer地址
- 启动请求响应渠道服务
- 启动定时任务task
- 若NameServer地址为空,启动检索NameServer地址Task
- 启动更新Topic路由信息Task
- 启动清除下线Broker,发送心跳信息给所有Broker的Task
- 启动更新消费进度Task
- 启动调整线程池的Task
- 启动拉取消息服务
- 启动负载均衡服务
- 启动推送服务
- 更改状态为运行中
- 更改状态为运行中
- 发送心跳信息给所有Broker
- 启动定时任务task
- producer数量等于1时,启动扫描所有超时Request的Task
- 若追踪调度器不为空,启动追踪调度器