上一篇分析了 NameServer 的启动,既然 NameServer 已经启动了,那么我们知道了有一个 Netty 服务器正在监听端口号,等待接收连接请求,接下来分析下 Broker 是如何启动的。

mqbroker 启动脚本

// TODO

BrokerStartup 启动类

Broker 的 JVM 进程启动后,会执行 BrokerStartup 的 main() 方法,去看下 BrokerStartup 所在的位置,在 Broker 模块下。
image.png
看下其中的 main 方法

  1. public static void main(String[] args) {
  2. start(createBrokerController(args));
  3. }

和 NamesrvStartup 的启动代码类似,都是先创建了一个 Controller 核心组件,然后调用 start 方法去启动这个 Controller 组件。

createBrokerController 方法

先来看下 createBrokerController 方法,接下来分段分析其中的代码。

  1. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  2. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {
  3. NettySystemConfig.socketSndbufSize = 131072;
  4. }
  5. if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {
  6. NettySystemConfig.socketRcvbufSize = 131072;
  7. }

这段代码,一开始设置了一个系统变量,然后设置了两个 Netty 网络通信相关的变量,就是 socket 发送和接收缓冲区的大小。接着往下看。

解析命令行参数

  1. Options options = ServerUtil.buildCommandlineOptions(new Options());
  2. commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
  3. new PosixParser());
  4. if (null == commandLine) {
  5. System.exit(-1);
  6. }

这段代码在 NameServer 类里也看到过类似的,就是用来解析通过命令行给 broker 传递的一些参数的,这些参数在 main() 方法里通过 args 传递进来,然后通过 ServerUtil.parseCmdLine() 方法进行解析。

初始化核心配置类

接着往下看,会看到一段关键的代码。

  1. // 初始化 Broker、NettyServer、NettyClient 的配置
  2. final BrokerConfig brokerConfig = new BrokerConfig();
  3. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  4. final NettyClientConfig nettyClientConfig = new NettyClientConfig();
  5. // 设置 Netty 客户端是否使用 TLS 加密机制
  6. nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
  7. String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
  8. // 设置 Netty 服务端监听的端口号
  9. nettyServerConfig.setListenPort(10911);
  10. // 初始化 Broker 用来存储消息的一些配置信息
  11. final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
  12. // 如果当前的 Broker 节点是 SLAVE 的话,设置默认值
  13. if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
  14. int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
  15. messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
  16. }

上述的代码和 NameServer 类似,Broker 在启动的时候也是初始化了几个核心配置类,包括了 Broker 自己的配置,Broker 作为一个 Netty 服务器的配置、Broker 作为一个 Netty 客户端的配置和 Broker 消息存储的配置。

  • 当我们的 Producer 连接到 Broker 上发送消息的时候,那么 Broker 就是一个 Netty 服务器,负责监听客户端的连接请求。
  • 当 Broker 和 NameServer 建立连接的时候,此时 Broker 又是一个 Netty 客户端,需要和 NameServer 的 Netty 服务端建立连接。

通过上述分析,可以得出下图,包含了 Broker 的 几个核心配置组件。
image.png

解析配置类并填充信息

接下看下面的代码,看看是如何解析核心配置类并填充信息的。

  1. if (commandLine.hasOption('c')) {
  2. String file = commandLine.getOptionValue('c');
  3. if (file != null) {
  4. configFile = file;
  5. InputStream in = new BufferedInputStream(new FileInputStream(file));
  6. properties = new Properties();
  7. properties.load(in);
  8. properties2SystemEnv(properties);
  9. MixAll.properties2Object(properties, brokerConfig);
  10. MixAll.properties2Object(properties, nettyServerConfig);
  11. MixAll.properties2Object(properties, nettyClientConfig);
  12. MixAll.properties2Object(properties, messageStoreConfig);
  13. BrokerPathConfigHelper.setBrokerConfigPath(file);
  14. in.close();
  15. }
  16. }

上面的代码就是如果在启动 Broker 的时候,用 -c 选项带了一个配置文件的地址,此时他会读取配置文件里了定义的一些配置信息,然后解析后覆盖到对应的 4 个配置类中去。
之前通过源码启动 启动 Broker 的时候,我们自定义了配置文件,然后通过 -c 带上了配置文件的地址,就是在上述代码中,他会读取我们自定义的配置文件,填充到他的配置类中去。
继续看后面代码,看看如何解析和填充配置的。

  1. // 解析命令行中的参数填充到 BrokerConfig 配置对象中去
  2. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);
  3. // 如果没有 ROCKETMQ_HOME 环境变量,则启动失败
  4. if (null == brokerConfig.getRocketmqHome()) {
  5. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV);
  6. System.exit(-2);
  7. }
  8. // 将 BrokerConfig 中 NameServer 地址解析出来
  9. String namesrvAddr = brokerConfig.getNamesrvAddr();
  10. if (null != namesrvAddr) {
  11. try {
  12. String[] addrArray = namesrvAddr.split(";");
  13. for (String addr : addrArray) {
  14. RemotingUtil.string2SocketAddress(addr);
  15. }
  16. } catch (Exception e) {
  17. System.out.printf(
  18. "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n",
  19. namesrvAddr);
  20. System.exit(-3);
  21. }
  22. }
  23. // 根据不同的 Broker 角色作处理
  24. switch (messageStoreConfig.getBrokerRole()) {
  25. case ASYNC_MASTER:
  26. case SYNC_MASTER:
  27. brokerConfig.setBrokerId(MixAll.MASTER_ID);
  28. break;
  29. case SLAVE:
  30. if (brokerConfig.getBrokerId() <= 0) {
  31. System.out.printf("Slave's brokerId must be > 0");
  32. System.exit(-3);
  33. }
  34. break;
  35. default:
  36. break;
  37. }
  38. // 判断是否是基于 dLeger 技术来管理主从同步和 CommitLog 的
  39. if (messageStoreConfig.isEnableDLegerCommitLog()) {
  40. // 如果是的话,就将 brokerId 设置为 -1
  41. brokerConfig.setBrokerId(-1);
  42. }
  43. // 设置 HA 监听端口号
  44. messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
  45. // 日志相关
  46. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  47. JoranConfigurator configurator = new JoranConfigurator();
  48. configurator.setContext(lc);
  49. lc.reset();
  50. configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml");
  51. // -p 表示启动时打印配置参数
  52. if (commandLine.hasOption('p')) {
  53. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  54. MixAll.printObjectProperties(console, brokerConfig);
  55. MixAll.printObjectProperties(console, nettyServerConfig);
  56. MixAll.printObjectProperties(console, nettyClientConfig);
  57. MixAll.printObjectProperties(console, messageStoreConfig);
  58. System.exit(0);
  59. }
  60. // 和 -p 一样,也是打印配置参数,不过是打印加 @ImportantField 注解的
  61. else if (commandLine.hasOption('m')) {
  62. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);
  63. MixAll.printObjectProperties(console, brokerConfig, true);
  64. MixAll.printObjectProperties(console, nettyServerConfig, true);
  65. MixAll.printObjectProperties(console, nettyClientConfig, true);
  66. MixAll.printObjectProperties(console, messageStoreConfig, true);
  67. System.exit(0);
  68. }
  69. // 不管怎样,都会打印 Broker 的配置参数
  70. log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
  71. MixAll.printObjectProperties(log, brokerConfig);
  72. MixAll.printObjectProperties(log, nettyServerConfig);
  73. MixAll.printObjectProperties(log, nettyClientConfig);
  74. MixAll.printObjectProperties(log, messageStoreConfig);

创建 BrokerController

上述代码已经对核心配置类进行了填充,接下来就是用这些配置去构建 BrokerController,接着往下看,通过 4 个配置类来创建了一个 BrokerController 对象。

  1. final BrokerController controller = new BrokerController(
  2. brokerConfig,
  3. nettyServerConfig,
  4. nettyClientConfig,
  5. messageStoreConfig);
  6. // remember all configs to prevent discard
  7. controller.getConfiguration().registerConfig(properties);

为什么叫 BrokerController

Broker 启动的时候,main class 是 BrokerStartup 这个类,这个类名,顾名思义,就是用来启动 Broker 的一个类,他里面包含的是把 Broker 给进行初始化和完成全部启动工作的逻辑。

BrokerStartup 自己并不能代表一个 Broker,所以其中最核心的组件就是 BrokerController,这个 BrokerController 和在 SpringMVC 中的 Controller 是同一个意思,负责接收和处理请求。

BrokerController 的中文可以翻译成“Broker 管理控制组件”,表示这个组件被创建出来以及初始化之后就是用来控制当前正在运行的这个 Broker 的,所以英文名叫 BrokerController,也正是因为如此,当我们用 mqbroker 脚本启动的 JVM 进程,可以认为就是一个 Broker,这里的 Broker 实际上应该是代表了一个 JVM 进程的概念,而不是任何一个代码组件。

然后 BrokerStartup 作为一个 main class,其实是属于一个代码组件,作用是准备好核心配置组件,然后就是创建、初始化以及启动 BrokerController 这个核心组件,也就是启动一个 Broker 管理控制组件,让 BrokerController 去控制和管理 Broker 这个 JVM 进程运行过程中的一切行为,包括接收网络请求、包括管理磁盘上的消息数据,以及一大堆后台线程的运行。完善下上方的那个图。
image.png
上面的图中,把几者之间的关系,说的很清晰了,Broker 这个概念本身代表的不是一个代码组件,他就是你用 mqbroker 脚本启动的 JVM 进程。然后 JVM 进程的 main class 是 BrokerStartup,他是一个启动组件,然后负责初始化核心配置组件,然后基于核心配置组件去启动 BrokerController 这个管控组件。
然后在 Broker 这个 JVM 进程运行期间,都是由 BrokerController 这个管控组件去管理 Broker 的请求处理、后台线程以及磁盘数据。

BrokerController 构造方法

先初步的看下 BrokerController 的构造方法。

  1. public BrokerController(
  2. final BrokerConfig brokerConfig,
  3. final NettyServerConfig nettyServerConfig,
  4. final NettyClientConfig nettyClientConfig,
  5. final MessageStoreConfig messageStoreConfig) {
  6. // 保存 4 个核心配置
  7. this.brokerConfig = brokerConfig;
  8. this.nettyServerConfig = nettyServerConfig;
  9. this.nettyClientConfig = nettyClientConfig;
  10. this.messageStoreConfig = messageStoreConfig;
  11. // 初始化一堆 Broker 各种功能对应的组件
  12. // 管理 consumer 消息 offset
  13. this.consumerOffsetManager = new ConsumerOffsetManager(this);
  14. // 管理 topic 配置
  15. this.topicConfigManager = new TopicConfigManager(this);
  16. // 处理 consumer pull 消息
  17. this.pullMessageProcessor = new PullMessageProcessor(this);
  18. this.pullRequestHoldService = new PullRequestHoldService(this);
  19. this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
  20. this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
  21. this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
  22. this.consumerFilterManager = new ConsumerFilterManager(this);
  23. this.producerManager = new ProducerManager();
  24. this.clientHousekeepingService = new ClientHousekeepingService(this);
  25. this.broker2Client = new Broker2Client(this);
  26. this.subscriptionGroupManager = new SubscriptionGroupManager(this);
  27. this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
  28. this.filterServerManager = new FilterServerManager(this);
  29. this.slaveSynchronize = new SlaveSynchronize(this);
  30. // 初始化一堆线程池的队列
  31. this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
  32. this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
  33. this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
  34. this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
  35. this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
  36. this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
  37. this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
  38. this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
  39. // 初始化一些组件
  40. // 管理 metric 统计的
  41. this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
  42. this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
  43. // 处理 broker 故障的
  44. this.brokerFastFailure = new BrokerFastFailure(this);
  45. this.configuration = new Configuration(
  46. log,
  47. BrokerPathConfigHelper.getBrokerConfigPath(),
  48. this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
  49. );
  50. }

看了上方的构造方法,主要知道了 BrokerController 内部是有一系列的功能性组件的,还有一大堆后台线程池。如下图。
image.png

初始化 BrokerController

现在 BrokerController 创建好了