- Producer启动
- 1):Producer启动代码执行
- DefaultMQProducerImpl#start
- MQClientManager#getOrCreateMQClientInstance
- MQClientInstance#registerProducer
- MQClientInstance#start
- MQClientAPIImpl#start
- MQClientInstance#startScheduledTask
- MQClientInstance#sendHeartbeatToAllBrokerWithLock
- MQClientInstance#sendHeartbeatToAllBroker
- DefaultMQProducerImpl#startScheduledTask
- RequestFutureTable#scanExpiredRequest
- 2):Producer启动逻辑总结
- 1):Producer启动代码执行
Producer启动
1):Producer启动代码执行
首先利用DefaultMQProducer实现类进行启动。
//设置生产组this.setProducerGroup(withNamespace(this.producerGroup));//生产者实现启动this.defaultMQProducerImpl.start();//判断消息追踪器是否为空if (null != traceDispatcher) {try {//消息追踪器启动traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());} catch (MQClientException e) {log.warn("trace dispatcher start failed ", e);}}
DefaultMQProducerImpl#start
//serviceState默认为CREATE_JUST(刚创建)switch (this.serviceState) {case CREATE_JUST://设置服务状态为启动失败this.serviceState = ServiceState.START_FAILED;//检查配置this.checkConfig();//判断生产组是否符合要求if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {//更改InstanceName为进程IDthis.defaultMQProducer.changeInstanceNameToPID();}//获取或创建MQ客户端实例this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);//注册生产者boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);if (!registerOK) {//注册失败抛异常this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}//增加Topic发布信息this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());//启动client属性,默认trueif (startFactory) {//MQ客户端实例启动mQClientFactory.start();}//打印生产者启动成功log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),this.defaultMQProducer.isSendMessageWithVIPChannel());//将服务状态改为运行中this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The producer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}//发送心跳给所有Brokerthis.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//启动定时Taskthis.startScheduledTask();
MQClientManager#getOrCreateMQClientInstance
//clientId = 客户端IP@DEFAULTString clientId = clientConfig.buildMQClientId();//根据clientId获取MQ客户端实例MQClientInstance instance = this.factoryTable.get(clientId);if (null == instance) {//创建MQ客户端实例instance =new MQClientInstance(clientConfig.cloneClientConfig(),this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);//判断是否已创建,已创建则用已有实例MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);if (prev != null) {instance = prev;log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);} else {log.info("Created new MQClientInstance for clientId:[{}]", clientId);}}return instance;
MQClientInstance#registerProducer
if (null == group || null == producer) {return false;}//放入生产者实例集合MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);if (prev != null) {log.warn("the producer group[{}] exist already.", group);return false;}return true;
MQClientInstance#start
//加上同步锁,锁对象为MQClientInstance,一个jvm只有一个实例synchronized (this) {//switch状态switch (this.serviceState) {case CREATE_JUST://设置当前状态为失败,避免代码执行过程中异常退出this.serviceState = ServiceState.START_FAILED;// 如果当前配置中的NameServer地址为空,则检索NameServer地址if (null == this.clientConfig.getNamesrvAddr()) {//检索NameServer地址this.mQClientAPIImpl.fetchNameServerAddr();}// 启动请求-接收通道this.mQClientAPIImpl.start();// 启动定时任务Taskthis.startScheduledTask();// 启动拉取消息服务this.pullMessageService.start();// 启动负载均衡服务this.rebalanceService.start();// 启动推送服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);//设置当前状态为运行中this.serviceState = ServiceState.RUNNING;break;case START_FAILED://启动失败,抛出异常throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
MQClientAPIImpl#start
//远程客户端启动this.remotingClient.start();
MQClientInstance#startScheduledTask
//若配置NameServer地址为空,开启检索NameServer地址,10后执行,120s执行一次if (null == this.clientConfig.getNamesrvAddr()) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();} catch (Exception e) {log.error("ScheduledTask fetchNameServerAddr exception", e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}//从NameServer中更新Topic路由信息,默认30s执行一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.updateTopicRouteInfoFromNameServer();} catch (Exception e) {log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);}}}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);//心跳续约,1s后执行,默认30s执行一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {//清除下线Broker信息MQClientInstance.this.cleanOfflineBroker();//发送心跳信息给所有BrokerMQClientInstance.this.sendHeartbeatToAllBrokerWithLock();} catch (Exception e) {log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);}}}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);//更新消费进度,10s后执行,每隔5s执行一次(逻辑暂不确定,回头再改)this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.persistAllConsumerOffset();} catch (Exception e) {log.error("ScheduledTask persistAllConsumerOffset exception", e);}}}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//调整线程池,1秒执行一次this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {MQClientInstance.this.adjustThreadPool();} catch (Exception e) {log.error("ScheduledTask adjustThreadPool exception", e);}}}, 1, 1, TimeUnit.MINUTES);
MQClientInstance#sendHeartbeatToAllBrokerWithLock
//重入锁加锁if (this.lockHeartbeat.tryLock()) {try {//发送心跳给所有Brokerthis.sendHeartbeatToAllBroker();//上传过滤资源(暂时还没搞清楚用来做什么)this.uploadFilterClassSource();} catch (final Exception e) {log.error("sendHeartbeatToAllBroker exception", e);} finally {//解锁this.lockHeartbeat.unlock();}} else {log.warn("lock heartBeat, but failed. [{}]", this.clientId);}
MQClientInstance#sendHeartbeatToAllBroker
//组装心跳数据,若是producer,那就是producer心跳数据。若是consumer,那就是consumer心跳数据final HeartbeatData heartbeatData = this.prepareHeartbeatData();final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();//producer跟consumer集合都为空,即返回if (producerEmpty && consumerEmpty) {log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);return;}//若Broker地址列表不为空if (!this.brokerAddrTable.isEmpty()) {//获取心跳次数long times = this.sendHeartbeatTimesTotal.getAndIncrement();Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, HashMap<Long, String>> entry = it.next();String brokerName = entry.getKey();HashMap<Long, String> oneTable = entry.getValue();if (oneTable != null) {for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {Long id = entry1.getKey();String addr = entry1.getValue();if (addr != null) {if (consumerEmpty) {if (id != MixAll.MASTER_ID)continue;}try {//发送心跳给Brokerint version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());if (!this.brokerVersionTable.containsKey(brokerName)) {this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));}this.brokerVersionTable.get(brokerName).put(addr, version);if (times % 20 == 0) {log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);log.info(heartbeatData.toString());}} catch (Exception e) {if (this.isBrokerInNameServer(addr)) {log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);} else {log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,id, addr, e);}}}}}}}
DefaultMQProducerImpl#startScheduledTask
//若生产者数量从0变成1时,执行扫描超时请求,3s后执行,1s一次if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {RequestFutureTable.scanExpiredRequest();} catch (Throwable e) {log.error("scan RequestFutureTable exception", e);}}}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);}
RequestFutureTable#scanExpiredRequest
final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();while (it.hasNext()) {Map.Entry<String, RequestResponseFuture> next = it.next();RequestResponseFuture rep = next.getValue();//删除超时Request连接if (rep.isTimeout()) {it.remove();rfList.add(rep);log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());}}//轮询超时request,超时request报错for (RequestResponseFuture rf : rfList) {try {Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");rf.setCause(cause);rf.executeRequestCallback();} catch (Throwable e) {log.warn("scanResponseTable, operationComplete Exception", e);}}
2):Producer启动逻辑总结
- 默认Producer实例启动。
- 检查producer配置,主要时producer生产者名称。若producerGroup不是默认名称,进行更改名称。
- 创建或获取MQ客户端实例(一个JVM只有一个)
- 在本地根据producerGroup注册producer
- 生成topic订阅信息
- 启动MQ客户端实例
- 若NameServer地址为空,检索NameServer地址
- 启动请求响应渠道服务
- 启动定时任务task
- 若NameServer地址为空,启动检索NameServer地址Task
- 启动更新Topic路由信息Task
- 启动清除下线Broker,发送心跳信息给所有Broker的Task
- 启动更新消费进度Task
- 启动调整线程池的Task
- 启动拉取消息服务
- 启动负载均衡服务
- 启动推送服务
- 更改状态为运行中
- 更改状态为运行中
- 发送心跳信息给所有Broker
- 启动定时任务task
- producer数量等于1时,启动扫描所有超时Request的Task
- 若追踪调度器不为空,启动追踪调度器
