1):找到NameServer的启动类:org.apache.rocketmq.namesrv.NamesrvStartup。

  1. //创建NameServerController
  2. NamesrvController controller = createNamesrvController(args);
  3. //启动controller
  4. start(controller);
  5. //打印成功消息
  6. String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
  7. log.info(tip);
  8. System.out.printf("%s%n", tip);

2):createNamesrvController(args);

  1. //创建NameServer的配置类
  2. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  3. //创建Netty的服务配置类
  4. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  5. //设定默认监听端口为:9876
  6. nettyServerConfig.setListenPort(9876);
  7. //读取加载配置变量或配置文件
  8. if (commandLine.hasOption('c')) {
  9. String file = commandLine.getOptionValue('c');
  10. if (file != null) {
  11. InputStream in = new BufferedInputStream(new FileInputStream(file));
  12. properties = new Properties();
  13. properties.load(in);
  14. MixAll.properties2Object(properties, namesrvConfig);
  15. MixAll.properties2Object(properties, nettyServerConfig);
  16. namesrvConfig.setConfigStorePath(file);
  17. System.out.printf("load config properties file OK, %s%n", file);
  18. in.close();
  19. }
  20. }
  21. ../后面部分代码忽略
  22. //创建NameServer的controller
  23. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  24. //注册配置
  25. controller.getConfiguration().registerConfig(properties);

NamesrvConfig类属性

  1. //RocketMQ的主目录,如:ROCKETMQ_HOME=D:\rocketmq
  2. private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
  3. //NameServer存储KV配置属性的持久化路径
  4. private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
  5. //NameServer的默认配置路径
  6. private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
  7. //生产环境名称:默认center
  8. private String productEnvName = "center";
  9. //集群测试,默认false
  10. private boolean clusterTest = false;
  11. //是否开启顺序消息:默认false
  12. private boolean orderMessageEnable = false;

NettyServerConfig类属性

  1. //监听端口:8888
  2. private int listenPort = 8888;
  3. //Netty业务工作线程数:默认8
  4. private int serverWorkerThreads = 8;
  5. //Netty 公共任务线程个数:默认0。Netty会根据业务类型(消息发送,消息消费,心跳检测等)
  6. 创建不同的线程池。如果该业务类型(RequestCode)没有线程池,则由公共任务线程处理。
  7. private int serverCallbackExecutorThreads = 0;
  8. //Selector线程个数:默认3。Selector主要用于接收解析请求,返回响应的操作。
  9. private int serverSelectorThreads = 3;
  10. //发送OneWay消息并发数:默认256
  11. private int serverOnewaySemaphoreValue = 256;
  12. //异步消息并发数:默认64
  13. private int serverAsyncSemaphoreValue = 64;
  14. //最大网络连接空闲时间,默认120s。连接超过该时间后,连接将会关闭
  15. private int serverChannelMaxIdleTimeSeconds = 120;
  16. //网络socket发送缓存大小:默认65535b=16kb,
  17. private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
  18. //网络socket接收缓存大小:默认65535b=16kb
  19. private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
  20. //ByteBuffer是否开启缓存,默认开启
  21. private boolean serverPooledByteBufAllocatorEnable = true;
  22. //是否启动Epoll I/O模型,建议Linux环境开启
  23. private boolean useEpollNativeSelector = false;

3):start(controller);

  1. //初始化controller
  2. boolean initResult = controller.initialize();
  3. //初始化失败,controller停止,释放资源
  4. if (!initResult) {
  5. controller.shutdown();
  6. System.exit(-3);
  7. }
  8. //检测JVM进程关闭事件,释放资源
  9. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  10. @Override
  11. public Void call() throws Exception {
  12. //controller停止,释放资源
  13. controller.shutdown();
  14. return null;
  15. }
  16. }));
  17. //启动controller
  18. controller.start();

3.1) controller.initialize();

  1. //KV配置管理加载
  2. this.kvConfigManager.load();
  3. //Netty远程服务器
  4. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
  5. this.brokerHousekeepingService);
  6. //公共线程执行器
  7. this.remotingExecutor =
  8. Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
  9. new ThreadFactoryImpl("RemotingExecutorThread_"));
  10. //注册进程
  11. this.registerProcessor();
  12. //扫描非活跃Broker(检测心跳,超过120s后剔除Broker),10s一次,初始化时5s后开始。
  13. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  14. @Override
  15. public void run() {
  16. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  17. }
  18. }, 5, 10, TimeUnit.SECONDS);
  19. //打印所有KV配置信息,10s一次,初始化时1s后执行。
  20. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  21. @Override
  22. public void run() {
  23. NamesrvController.this.kvConfigManager.printAllPeriodically();
  24. }
  25. }, 1, 10, TimeUnit.MINUTES);

3.2): controller.start();

  1. //启动
  2. this.remotingServer.start();
  3. if (this.fileWatchService != null) {
  4. this.fileWatchService.start();
  5. }