上篇环境搭建及NamesrvController分析: RocketMQ源码环境搭建&NamesrvController分析

BrokerController 分析

BrokerStartup#main -> 调用

BrokerStartup#start(createBrokerController(args))

createBrokerController(args)

  1. public static BrokerController createBrokerController(String[] args) {
  2. // NamesrvController一样,设置环境变量
  3. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  4. // 数据校验,设默认值
  5. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
  6. NettySystemConfig.socketSndbufSize = 131072;
  7. }
  8. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
  9. NettySystemConfig.socketRcvbufSize = 131072;
  10. }
  11. try {
  12. // 创建命令行的选项
  13. Options options = ServerUtil.buildCommandlineOptions(new Options());
  14. commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
  15. new PosixParser());
  16. if (null == commandLine) {
  17. System.exit(-1);
  18. }
  19. // 创建BrokerConfig,NettyServerConfig,NettyClientConfig 三个配置类
  20. final BrokerConfig brokerConfig = new BrokerConfig();
  21. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  22. final NettyClientConfig nettyClientConfig = new NettyClientConfig();
  23. // 设置是否使用安全链接
  24. nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
  25. String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  26. // 设置Broker的监听端口
  27. nettyServerConfig.setListenPort(10911);
  28. // 设置消息的存储配置类
  29. final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
  30. // 刚刚创建,messageStoreConfig的默认是是ASYNC_MASTER,这个判断永远不成立
  31. // 有点不明白,为什么会出现这个东西
  32. if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
  33. int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
  34. messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
  35. }
  36. // 加载配置文件
  37. if (commandLine.hasOption('c')) {
  38. String file = commandLine.getOptionValue('c');
  39. if (file != null) {
  40. configFile = file;
  41. InputStream in = new BufferedInputStream(new FileInputStream(file));
  42. properties = new Properties();
  43. properties.load(in);
  44. // 将properties中的内容,通过反射,设置到各个配置类中
  45. properties2SystemEnv(properties);
  46. MixAll.properties2Object(properties, brokerConfig);
  47. MixAll.properties2Object(properties, nettyServerConfig);
  48. MixAll.properties2Object(properties, nettyClientConfig);
  49. MixAll.properties2Object(properties, messageStoreConfig);
  50. // 然后,设置Broker配置文件的路径
  51. BrokerPathConfigHelper.setBrokerConfigPath(file);
  52. in.close();
  53. }
  54. }
  55. // 把命令行的参数,先弄成一个properties,然后设置到BrokerConfig中
  56. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
  57. // RocketMQ home没设置,报错
  58. if (null == brokerConfig.getRocketmqHome()) {
  59. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
  60. System.exit(-2);
  61. }
  62. // 获取到配置文件中配置的Namesrv的地址
  63. String namesrvAddr = brokerConfig.getNamesrvAddr();
  64. if (null != namesrvAddr) {
  65. try {
  66. // 根据分号进行分割namesrv的地址(因为可以配置多个)
  67. String[] addrArray = namesrvAddr.split(";");
  68. for (String addr : addrArray) {
  69. // 尝试着把他封装成一个SocketAddress,如果转化不了,就会抛出异常
  70. // 说明配置的ip不会,就直接退出
  71. RemotingUtil.string2SocketAddress(addr);
  72. }
  73. } catch (Exception e) {
  74. System.out.printf(
  75. "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
  76. namesrvAddr);
  77. System.exit(-3);
  78. }
  79. }
  80. // 在上面,已经把配置文件中的值,设置到配置文件中的,根据配置文件中配置的角色,设置BrokerId
  81. switch (messageStoreConfig.getBrokerRole()) {
  82. case ASYNC_MASTER:
  83. case SYNC_MASTER:
  84. brokerConfig.setBrokerId(MixAll.MASTER_ID);
  85. break;
  86. case SLAVE:
  87. if (brokerConfig.getBrokerId() <= 0) {
  88. System.out.printf("Slave's brokerId must be > 0");
  89. System.exit(-3);
  90. }
  91. break;
  92. default:
  93. break;
  94. }
  95. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  96. brokerConfig.setBrokerId(-1);
  97. }
  98. messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
  99. // 获取上下文,读取配置文件
  100. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  101. JoranConfigurator configurator = new JoranConfigurator();
  102. configurator.setContext(lc);
  103. lc.reset();
  104. configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
  105. // 根据命令行设置的东西,读取到配置类中
  106. if (commandLine.hasOption('p')) {
  107. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  108. MixAll.printObjectProperties(console, brokerConfig);
  109. MixAll.printObjectProperties(console, nettyServerConfig);
  110. MixAll.printObjectProperties(console, nettyClientConfig);
  111. MixAll.printObjectProperties(console, messageStoreConfig);
  112. System.exit(0);
  113. } else if (commandLine.hasOption('m')) {
  114. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  115. MixAll.printObjectProperties(console, brokerConfig, true);
  116. MixAll.printObjectProperties(console, nettyServerConfig, true);
  117. MixAll.printObjectProperties(console, nettyClientConfig, true);
  118. MixAll.printObjectProperties(console, messageStoreConfig, true);
  119. System.exit(0);
  120. }
  121. // 将配置信息,打印到日志中
  122. log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
  123. MixAll.printObjectProperties(log, brokerConfig);
  124. MixAll.printObjectProperties(log, nettyServerConfig);
  125. MixAll.printObjectProperties(log, nettyClientConfig);
  126. MixAll.printObjectProperties(log, messageStoreConfig);
  127. // 创建BrokerController
  128. final BrokerController controller = new BrokerController(
  129. brokerConfig,
  130. nettyServerConfig,
  131. nettyClientConfig,
  132. messageStoreConfig);
  133. // remember all configs to prevent discard
  134. controller.getConfiguration().registerConfig(properties);
  135. boolean initResult = controller.initialize();
  136. if (!initResult) {
  137. controller.shutdown();
  138. System.exit(-3);
  139. }
  140. // 添加一个Hook,用来关闭BrokerController
  141. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  142. private volatile boolean hasShutdown = false;
  143. private AtomicInteger shutdownTimes = new AtomicInteger(0);
  144. @Override
  145. public void run() {
  146. synchronized (this) {
  147. log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
  148. if (!this.hasShutdown) {
  149. this.hasShutdown = true;
  150. long beginTime = System.currentTimeMillis();
  151. controller.shutdown();
  152. long consumingTimeTotal = System.currentTimeMillis() - beginTime;
  153. log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
  154. }
  155. }
  156. }
  157. }, "ShutdownHook"));
  158. return controller;
  159. } catch (Throwable e) {
  160. e.printStackTrace();
  161. System.exit(-1);
  162. }
  163. return null;
  164. }

controller.start()

  1. public void start() throws Exception {
  2. // 启动消息持久化,启动Netty服务等等等等,一堆启动
  3. if (this.messageStore != null) {
  4. this.messageStore.start();
  5. }
  6. if (this.remotingServer != null) {
  7. this.remotingServer.start();
  8. }
  9. if (this.fastRemotingServer != null) {
  10. this.fastRemotingServer.start();
  11. }
  12. if (this.fileWatchService != null) {
  13. this.fileWatchService.start();
  14. }
  15. //
  16. if (this.brokerOuterAPI != null) {
  17. this.brokerOuterAPI.start();
  18. }
  19. if (this.pullRequestHoldService != null) {
  20. this.pullRequestHoldService.start();
  21. }
  22. if (this.clientHousekeepingService != null) {
  23. this.clientHousekeepingService.start();
  24. }
  25. if (this.filterServerManager != null) {
  26. this.filterServerManager.start();
  27. }
  28. if (!messageStoreConfig.isEnableDLegerCommitLog()) {
  29. startProcessorByHa(messageStoreConfig.getBrokerRole());
  30. handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
  31. this.registerBrokerAll(true, false, true);
  32. }
  33. // 定时线程池,定时将Broker注册到Namesrv上
  34. // 通过BrokerOuterAPI,与Namesrv交互,注册Broker
  35. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  36. @Override
  37. public void run() {
  38. try {
  39. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
  40. } catch (Throwable e) {
  41. log.error("registerBrokerAll Exception", e);
  42. }
  43. }
  44. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
  45. if (this.brokerStatsManager != null) {
  46. this.brokerStatsManager.start();
  47. }
  48. // 定时清理过期的请求
  49. if (this.brokerFastFailure != null) {
  50. this.brokerFastFailure.start();
  51. }
  52. }

打印日志,结束