1):找到NameServer的启动类:org.apache.rocketmq.namesrv.NamesrvStartup。
//创建NameServerControllerNamesrvController controller = createNamesrvController(args);//启动controllerstart(controller);//打印成功消息String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();log.info(tip);System.out.printf("%s%n", tip);
2):createNamesrvController(args);
//创建NameServer的配置类final NamesrvConfig namesrvConfig = new NamesrvConfig();//创建Netty的服务配置类final NettyServerConfig nettyServerConfig = new NettyServerConfig();//设定默认监听端口为:9876nettyServerConfig.setListenPort(9876);//读取加载配置变量或配置文件if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } }../后面部分代码忽略//创建NameServer的controllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);//注册配置controller.getConfiguration().registerConfig(properties);
NamesrvConfig类属性
//RocketMQ的主目录,如:ROCKETMQ_HOME=D:\rocketmqprivate String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));//NameServer存储KV配置属性的持久化路径private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";//NameServer的默认配置路径private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";//生产环境名称:默认centerprivate String productEnvName = "center";//集群测试,默认falseprivate boolean clusterTest = false;//是否开启顺序消息:默认falseprivate boolean orderMessageEnable = false;
NettyServerConfig类属性
//监听端口:8888private int listenPort = 8888;//Netty业务工作线程数:默认8private int serverWorkerThreads = 8;//Netty 公共任务线程个数:默认0。Netty会根据业务类型(消息发送,消息消费,心跳检测等)创建不同的线程池。如果该业务类型(RequestCode)没有线程池,则由公共任务线程处理。private int serverCallbackExecutorThreads = 0;//Selector线程个数:默认3。Selector主要用于接收解析请求,返回响应的操作。private int serverSelectorThreads = 3;//发送OneWay消息并发数:默认256private int serverOnewaySemaphoreValue = 256;//异步消息并发数:默认64private int serverAsyncSemaphoreValue = 64;//最大网络连接空闲时间,默认120s。连接超过该时间后,连接将会关闭private int serverChannelMaxIdleTimeSeconds = 120;//网络socket发送缓存大小:默认65535b=16kb,private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;//网络socket接收缓存大小:默认65535b=16kbprivate int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;//ByteBuffer是否开启缓存,默认开启private boolean serverPooledByteBufAllocatorEnable = true;//是否启动Epoll I/O模型,建议Linux环境开启private boolean useEpollNativeSelector = false;
3):start(controller);
//初始化controllerboolean initResult = controller.initialize();//初始化失败,controller停止,释放资源if (!initResult) { controller.shutdown(); System.exit(-3);}//检测JVM进程关闭事件,释放资源Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { //controller停止,释放资源 controller.shutdown(); return null; }}));//启动controllercontroller.start();
3.1) controller.initialize();
//KV配置管理加载this.kvConfigManager.load();//Netty远程服务器this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);//公共线程执行器this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));//注册进程this.registerProcessor();//扫描非活跃Broker(检测心跳,超过120s后剔除Broker),10s一次,初始化时5s后开始。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); }}, 5, 10, TimeUnit.SECONDS);//打印所有KV配置信息,10s一次,初始化时1s后执行。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); }}, 1, 10, TimeUnit.MINUTES);
3.2): controller.start();
//启动this.remotingServer.start();if (this.fileWatchService != null) { this.fileWatchService.start();}