1):找到NameServer的启动类:org.apache.rocketmq.namesrv.NamesrvStartup。
//创建NameServerController
NamesrvController controller = createNamesrvController(args);
//启动controller
start(controller);
//打印成功消息
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
2):createNamesrvController(args);
//创建NameServer的配置类
final NamesrvConfig namesrvConfig = new NamesrvConfig();
//创建Netty的服务配置类
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
//设定默认监听端口为:9876
nettyServerConfig.setListenPort(9876);
//读取加载配置变量或配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
../后面部分代码忽略
//创建NameServer的controller
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
//注册配置
controller.getConfiguration().registerConfig(properties);
NamesrvConfig类属性
//RocketMQ的主目录,如:ROCKETMQ_HOME=D:\rocketmq
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
//NameServer存储KV配置属性的持久化路径
private String kvConfigPath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "kvConfig.json";
//NameServer的默认配置路径
private String configStorePath = System.getProperty("user.home") + File.separator + "namesrv" + File.separator + "namesrv.properties";
//生产环境名称:默认center
private String productEnvName = "center";
//集群测试,默认false
private boolean clusterTest = false;
//是否开启顺序消息:默认false
private boolean orderMessageEnable = false;
NettyServerConfig类属性
//监听端口:8888
private int listenPort = 8888;
//Netty业务工作线程数:默认8
private int serverWorkerThreads = 8;
//Netty 公共任务线程个数:默认0。Netty会根据业务类型(消息发送,消息消费,心跳检测等)
创建不同的线程池。如果该业务类型(RequestCode)没有线程池,则由公共任务线程处理。
private int serverCallbackExecutorThreads = 0;
//Selector线程个数:默认3。Selector主要用于接收解析请求,返回响应的操作。
private int serverSelectorThreads = 3;
//发送OneWay消息并发数:默认256
private int serverOnewaySemaphoreValue = 256;
//异步消息并发数:默认64
private int serverAsyncSemaphoreValue = 64;
//最大网络连接空闲时间,默认120s。连接超过该时间后,连接将会关闭
private int serverChannelMaxIdleTimeSeconds = 120;
//网络socket发送缓存大小:默认65535b=16kb,
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
//网络socket接收缓存大小:默认65535b=16kb
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
//ByteBuffer是否开启缓存,默认开启
private boolean serverPooledByteBufAllocatorEnable = true;
//是否启动Epoll I/O模型,建议Linux环境开启
private boolean useEpollNativeSelector = false;
3):start(controller);
//初始化controller
boolean initResult = controller.initialize();
//初始化失败,controller停止,释放资源
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
//检测JVM进程关闭事件,释放资源
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
//controller停止,释放资源
controller.shutdown();
return null;
}
}));
//启动controller
controller.start();
3.1) controller.initialize();
//KV配置管理加载
this.kvConfigManager.load();
//Netty远程服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig,
this.brokerHousekeepingService);
//公共线程执行器
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(),
new ThreadFactoryImpl("RemotingExecutorThread_"));
//注册进程
this.registerProcessor();
//扫描非活跃Broker(检测心跳,超过120s后剔除Broker),10s一次,初始化时5s后开始。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
//打印所有KV配置信息,10s一次,初始化时1s后执行。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
3.2): controller.start();
//启动
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}