Producer启动

1):Producer启动代码执行

首先利用DefaultMQProducer实现类进行启动。

  1. //设置生产组
  2. this.setProducerGroup(withNamespace(this.producerGroup));
  3. //生产者实现启动
  4. this.defaultMQProducerImpl.start();
  5. //判断消息追踪器是否为空
  6. if (null != traceDispatcher) {
  7. try {
  8. //消息追踪器启动
  9. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
  10. } catch (MQClientException e) {
  11. log.warn("trace dispatcher start failed ", e);
  12. }
  13. }

DefaultMQProducerImpl#start

  1. //serviceState默认为CREATE_JUST(刚创建)
  2. switch (this.serviceState) {
  3. case CREATE_JUST:
  4. //设置服务状态为启动失败
  5. this.serviceState = ServiceState.START_FAILED;
  6. //检查配置
  7. this.checkConfig();
  8. //判断生产组是否符合要求
  9. if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
  10. //更改InstanceName为进程ID
  11. this.defaultMQProducer.changeInstanceNameToPID();
  12. }
  13. //获取或创建MQ客户端实例
  14. this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
  15. //注册生产者
  16. boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
  17. if (!registerOK) {
  18. //注册失败抛异常
  19. this.serviceState = ServiceState.CREATE_JUST;
  20. throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
  21. + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
  22. null);
  23. }
  24. //增加Topic发布信息
  25. this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
  26. //启动client属性,默认true
  27. if (startFactory) {
  28. //MQ客户端实例启动
  29. mQClientFactory.start();
  30. }
  31. //打印生产者启动成功
  32. log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
  33. this.defaultMQProducer.isSendMessageWithVIPChannel());
  34. //将服务状态改为运行中
  35. this.serviceState = ServiceState.RUNNING;
  36. break;
  37. case RUNNING:
  38. case START_FAILED:
  39. case SHUTDOWN_ALREADY:
  40. throw new MQClientException("The producer service state not OK, maybe started once, "
  41. + this.serviceState
  42. + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
  43. null);
  44. default:
  45. break;
  46. }
  47. //发送心跳给所有Broker
  48. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
  49. //启动定时Task
  50. this.startScheduledTask();

MQClientManager#getOrCreateMQClientInstance

  1. //clientId = 客户端IP@DEFAULT
  2. String clientId = clientConfig.buildMQClientId();
  3. //根据clientId获取MQ客户端实例
  4. MQClientInstance instance = this.factoryTable.get(clientId);
  5. if (null == instance) {
  6. //创建MQ客户端实例
  7. instance =
  8. new MQClientInstance(clientConfig.cloneClientConfig(),
  9. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
  10. //判断是否已创建,已创建则用已有实例
  11. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
  12. if (prev != null) {
  13. instance = prev;
  14. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
  15. } else {
  16. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
  17. }
  18. }
  19. return instance;

MQClientInstance#registerProducer

  1. if (null == group || null == producer) {
  2. return false;
  3. }
  4. //放入生产者实例集合
  5. MQProducerInner prev = this.producerTable.putIfAbsent(group, producer);
  6. if (prev != null) {
  7. log.warn("the producer group[{}] exist already.", group);
  8. return false;
  9. }
  10. return true;

MQClientInstance#start

  1. //加上同步锁,锁对象为MQClientInstance,一个jvm只有一个实例
  2. synchronized (this) {
  3. //switch状态
  4. switch (this.serviceState) {
  5. case CREATE_JUST:
  6. //设置当前状态为失败,避免代码执行过程中异常退出
  7. this.serviceState = ServiceState.START_FAILED;
  8. // 如果当前配置中的NameServer地址为空,则检索NameServer地址
  9. if (null == this.clientConfig.getNamesrvAddr()) {
  10. //检索NameServer地址
  11. this.mQClientAPIImpl.fetchNameServerAddr();
  12. }
  13. // 启动请求-接收通道
  14. this.mQClientAPIImpl.start();
  15. // 启动定时任务Task
  16. this.startScheduledTask();
  17. // 启动拉取消息服务
  18. this.pullMessageService.start();
  19. // 启动负载均衡服务
  20. this.rebalanceService.start();
  21. // 启动推送服务
  22. this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
  23. log.info("the client factory [{}] start OK", this.clientId);
  24. //设置当前状态为运行中
  25. this.serviceState = ServiceState.RUNNING;
  26. break;
  27. case START_FAILED:
  28. //启动失败,抛出异常
  29. throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
  30. default:
  31. break;
  32. }
  33. }

MQClientAPIImpl#start

  1. //远程客户端启动
  2. this.remotingClient.start();

MQClientInstance#startScheduledTask

  1. //若配置NameServer地址为空,开启检索NameServer地址,10后执行,120s执行一次
  2. if (null == this.clientConfig.getNamesrvAddr()) {
  3. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
  8. } catch (Exception e) {
  9. log.error("ScheduledTask fetchNameServerAddr exception", e);
  10. }
  11. }
  12. }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
  13. }
  14. //从NameServer中更新Topic路由信息,默认30s执行一次
  15. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  16. @Override
  17. public void run() {
  18. try {
  19. MQClientInstance.this.updateTopicRouteInfoFromNameServer();
  20. } catch (Exception e) {
  21. log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
  22. }
  23. }
  24. }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
  25. //心跳续约,1s后执行,默认30s执行一次
  26. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  27. @Override
  28. public void run() {
  29. try {
  30. //清除下线Broker信息
  31. MQClientInstance.this.cleanOfflineBroker();
  32. //发送心跳信息给所有Broker
  33. MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
  34. } catch (Exception e) {
  35. log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
  36. }
  37. }
  38. }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
  39. //更新消费进度,10s后执行,每隔5s执行一次(逻辑暂不确定,回头再改)
  40. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  41. @Override
  42. public void run() {
  43. try {
  44. MQClientInstance.this.persistAllConsumerOffset();
  45. } catch (Exception e) {
  46. log.error("ScheduledTask persistAllConsumerOffset exception", e);
  47. }
  48. }
  49. }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
  50. //调整线程池,1秒执行一次
  51. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  52. @Override
  53. public void run() {
  54. try {
  55. MQClientInstance.this.adjustThreadPool();
  56. } catch (Exception e) {
  57. log.error("ScheduledTask adjustThreadPool exception", e);
  58. }
  59. }
  60. }, 1, 1, TimeUnit.MINUTES);

MQClientInstance#sendHeartbeatToAllBrokerWithLock

  1. //重入锁加锁
  2. if (this.lockHeartbeat.tryLock()) {
  3. try {
  4. //发送心跳给所有Broker
  5. this.sendHeartbeatToAllBroker();
  6. //上传过滤资源(暂时还没搞清楚用来做什么)
  7. this.uploadFilterClassSource();
  8. } catch (final Exception e) {
  9. log.error("sendHeartbeatToAllBroker exception", e);
  10. } finally {
  11. //解锁
  12. this.lockHeartbeat.unlock();
  13. }
  14. } else {
  15. log.warn("lock heartBeat, but failed. [{}]", this.clientId);
  16. }

MQClientInstance#sendHeartbeatToAllBroker

  1. //组装心跳数据,若是producer,那就是producer心跳数据。若是consumer,那就是consumer心跳数据
  2. final HeartbeatData heartbeatData = this.prepareHeartbeatData();
  3. final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty();
  4. final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty();
  5. //producer跟consumer集合都为空,即返回
  6. if (producerEmpty && consumerEmpty) {
  7. log.warn("sending heartbeat, but no consumer and no producer. [{}]", this.clientId);
  8. return;
  9. }
  10. //若Broker地址列表不为空
  11. if (!this.brokerAddrTable.isEmpty()) {
  12. //获取心跳次数
  13. long times = this.sendHeartbeatTimesTotal.getAndIncrement();
  14. Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
  15. while (it.hasNext()) {
  16. Entry<String, HashMap<Long, String>> entry = it.next();
  17. String brokerName = entry.getKey();
  18. HashMap<Long, String> oneTable = entry.getValue();
  19. if (oneTable != null) {
  20. for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
  21. Long id = entry1.getKey();
  22. String addr = entry1.getValue();
  23. if (addr != null) {
  24. if (consumerEmpty) {
  25. if (id != MixAll.MASTER_ID)
  26. continue;
  27. }
  28. try {
  29. //发送心跳给Broker
  30. int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, clientConfig.getMqClientApiTimeout());
  31. if (!this.brokerVersionTable.containsKey(brokerName)) {
  32. this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
  33. }
  34. this.brokerVersionTable.get(brokerName).put(addr, version);
  35. if (times % 20 == 0) {
  36. log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
  37. log.info(heartbeatData.toString());
  38. }
  39. } catch (Exception e) {
  40. if (this.isBrokerInNameServer(addr)) {
  41. log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
  42. } else {
  43. log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
  44. id, addr, e);
  45. }
  46. }
  47. }
  48. }
  49. }
  50. }
  51. }

DefaultMQProducerImpl#startScheduledTask

  1. //若生产者数量从0变成1时,执行扫描超时请求,3s后执行,1s一次
  2. if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
  3. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  4. @Override
  5. public void run() {
  6. try {
  7. RequestFutureTable.scanExpiredRequest();
  8. } catch (Throwable e) {
  9. log.error("scan RequestFutureTable exception", e);
  10. }
  11. }
  12. }, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
  13. }

RequestFutureTable#scanExpiredRequest

  1. final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
  2. Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
  3. while (it.hasNext()) {
  4. Map.Entry<String, RequestResponseFuture> next = it.next();
  5. RequestResponseFuture rep = next.getValue();
  6. //删除超时Request连接
  7. if (rep.isTimeout()) {
  8. it.remove();
  9. rfList.add(rep);
  10. log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
  11. }
  12. }
  13. //轮询超时request,超时request报错
  14. for (RequestResponseFuture rf : rfList) {
  15. try {
  16. Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
  17. rf.setCause(cause);
  18. rf.executeRequestCallback();
  19. } catch (Throwable e) {
  20. log.warn("scanResponseTable, operationComplete Exception", e);
  21. }
  22. }

2):Producer启动逻辑总结

  1. 默认Producer实例启动。
    1. 检查producer配置,主要时producer生产者名称。若producerGroup不是默认名称,进行更改名称。
    2. 创建或获取MQ客户端实例(一个JVM只有一个)
    3. 在本地根据producerGroup注册producer
    4. 生成topic订阅信息
    5. 启动MQ客户端实例
      1. 若NameServer地址为空,检索NameServer地址
      2. 启动请求响应渠道服务
      3. 启动定时任务task
        1. 若NameServer地址为空,启动检索NameServer地址Task
        2. 启动更新Topic路由信息Task
        3. 启动清除下线Broker,发送心跳信息给所有Broker的Task
        4. 启动更新消费进度Task
        5. 启动调整线程池的Task
      4. 启动拉取消息服务
      5. 启动负载均衡服务
      6. 启动推送服务
      7. 更改状态为运行中
    6. 更改状态为运行中
    7. 发送心跳信息给所有Broker
    8. 启动定时任务task
      1. producer数量等于1时,启动扫描所有超时Request的Task
  2. 若追踪调度器不为空,启动追踪调度器