因为 RocketMQ 要运行的话,必须先启动 NameServer,因为后续 Broker 启动的时候,是要向 NameServer 注册的,然后 Producer 发送消息的时候,需要从 NameServer 获取 Broker 地址,才能发送到 Broker 去。所以先从 NameServer 启动这个场景开始分析 RocketMQ 源码。
mqnamesrv 启动脚本
NameServer 的启动时通过 RocketMQ 源码中 distribution 模块 bin 目录中的 mqnamesrv 这个脚本启动的,这个脚本中有一行关键的命令用于启动 NameServer 进程,如下所示。sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.namesrv.NamesrvStartup $@
可以看到,这个命令用 sh 命令执行了 runserver.sh 这个脚本,然后通过这个脚本去启动了 NamesrvStartup 这个 Java 类,runserver.sh 这个脚本中关键的启动 NamesrvStartup 类的这个命令如下。
// TODO
可以把些命令简化类似这样的一行命令:
// TODO
通过 java 命令 + 一个有 main() 方法的类,就是会启动一个 JVM 进程,通过这个 JVM 进程来执行 NamesrvStartup 类中的 main() 方法,在这个 main() 中就完成 NameServer 启动的所有流程和工作,除此之外命令上的的一些参数都是 JVM 相关的配置参数。
因此,使用 mqnamesrv 脚本启动 NameServer 的时候,本质上就是基于 java 命令启动了一个 JVM 进程,执行 NamesrvStartup 类中的 main() 方法,完成 NameServer 启动的全部流程和逻辑,同时启动 NameServer 这个 JVM 进程的时候,有一大堆的默认 JVM 参数,我们也可以在这里修改这些JVM参数,甚至进行优化。
将上面的流程通过流程图来表示如下所示:
NamesrvStartup 启动类
前面的启动脚本已经了解了,接下来进入 Java 代码,分析下 NamesrvStartup 这个启动类。
先来看下下面的启动类部分代码
public class NamesrvStartup {
// 日志、配置、命令行相关
private static InternalLogger log;
private static Properties properties = null;
private static CommandLine commandLine = null;
/**
* 首先执行的方法,调用了 main0 方法
* @param args
*/
public static void main(String[] args) {
main0(args);
}
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
...... 省略
}
启动类启动时首先会调用 main 方法,而 main 方法调用了 main0 方法。所以主要就是对 main0 方法进行分析。
createNamesrvController 方法
看下这么一行代码
NamesrvController controller = createNamesrvController(args);
这行代码创建了一个 NamesrvController 类,去看下 createNamesrvController 方法,该方法很长,分段进行讨论。这一段主要是跟 CommandLine 相关代,不是重点,直接略过。
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
...
两个关键的配置类 NamesrvConfig 和 NettyServerConfig。NamesrvConfig 包含的是自身运行的一些配置参数,NettyServerConfig 包含的是用于接收网络请求的 Netty 服务器的配置参数。从这里可以看出,nameServer 对外接收 Broker 和客户端的网络请求的时候,底层应该是基于 Netty 实现网络通信的。
final NamesrvConfig namesrvConfig = new NamesrvConfig();
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
nettyServerConfig.setListenPort(9876);
nettyServerConfig.setListenPort(9876);
这行代码可以知道 NameServer 默认固定的监听请求端口号是 9876,直接写死端口号再代码里,所以 NettyServer 应该就是监听了 9876 这个端口号来接受 Broker 和客户端的请求的。基于 Netty 实现的服务器用于接收网络请求如下图所示。
接下来看下 NameServer 的两个核心配置类的代码片段
NamesrvConfig
核心配置如下所示
public class NamesrvConfig {
/**
* RocketMQ 主目录地址,尝试获取 ROCKETMQ_HOME 这个环境变量的值
*/
private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
System.getenv(MixAll.ROCKETMQ_HOME_ENV));
/**
* NameServer 存放 kv 配置属性的地址
*/
private String kvConfigPath = System.getProperty("user.home")
+ File.separator
+ "namesrv"
+ File.separator
+ "kvConfig.json";
/**
* NameServer 的配置存储路径
*/
private String configStorePath = System.getProperty("user.home")
+ File.separator
+ "namesrv"
+ File.separator
+ "namesrv.properties";
/**
* 生产环境名称,默认 center
*/
private String productEnvName = "center";
/**
* 是否启动了 clusterTest 测试集群,默认是 false
*/
private boolean clusterTest = false;
/**
* 是否是支持有序消息,默认是 false,不支持
*/
private boolean orderMessageEnable = false;
}
NettyServerConfig
NettyServerConfig 中的参数就是用来配置 NettyServer 的,配置号后可以监听 9876 端口号,然后等 Broker 和客户端有请求过来,就可以处理了。
public class NettyServerConfig implements Cloneable {
/**
* NettyServer 默认监听端口号,不过被 NameServer 改成了 9876
*/
private int listenPort = 8888;
/**
* NettyServer 工作线程的数量,默认 8
*/
private int serverWorkerThreads = 8;
/**
* Netty 的 Public 线程池的线程数量,默认是 0
*/
private int serverCallbackExecutorThreads = 0;
/**
* Netty 的 IO 线程池的线程数量,默认是 3,负责解析网络请求
* 当线程解析完网络请求后,会把请求转发给 wrok 线程来处理
*/
private int serverSelectorThreads = 3;
/**
* 这两个是 broker 端的参数
* broker 端在基于 netty 构建网络服务器的时候,会使用下面两个参数
*/
private int serverOnewaySemaphoreValue = 256;
private int serverAsyncSemaphoreValue = 64;
/**
* 如果一个网络连接空闲超过 120s,就会被关闭
*/
private int serverChannelMaxIdleTimeSeconds = 120;
/**
* socket send buffer 缓冲区以及 receive buffer 缓冲区的大小
*/
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
/**
* ByteBuffer 是否开启缓存,默认开启
*/
private boolean serverPooledByteBufAllocatorEnable = true;
/**
* 是否默认开启 epoll IO 模型,默认是不开启的
* make make install
*
*
* ../glibc-2.10.1/configure \ --prefix=/usr \ --with-headers=/usr/include \
* --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd
*/
private boolean useEpollNativeSelector = false;
}
解析配置
前面介绍了 NamesrvConfig 和 NettyServerConfig 的配置,接下来继续看 NamesrvController 的代码,看看是如何解析配置类的。
// 如果通过命令行启动时,带上了 -c 这个选项
// -c 的意思是带上一个配置文件的地址,然后会读取这个配置文件的内容
if (commandLine.hasOption('c')) {
// 读取配置文件的名称
String file = commandLine.getOptionValue('c');
if (file != null) {
// 基于输入流从配置文件中读取配置
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
// 基于工具类,把读取到的配置文件解析到两个核心配置类中
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 设置配置文件存储路径
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
如果在启动 NameServer 的时候,在命令行带上了 -c 参数,则启动的时候就会解析 -c 后的配置文件然后放入上面的两个核心配置类中,然后核心配置类中的属性就会被覆盖。接下来看下剩余的配置相关的代码。
// 如果通过命令行启动时带了 —p 选项
// p 的意思时 print,就是打印出 NameServer 的所有配置信息
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 将命令行中所有的配置参数都读取出来覆盖到 namesrvConfig 中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 如果 ROCKETMQ_HOME 是空的,就会打印日志
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 日志、配置相关
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
// 打印 NameServer 配置信息
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
查看启动日志
刚刚分析了 NameServer 启动的时候会初始化和解析一些核心配置信息,如 NettyServer 的一些网络配置信息,然后初始化完毕配置信息后,就会打印这些配置信息,我们再来看下 NameServer 的启动日志,会看到如下内容,这些就是 NamesrvConfig 和 NettyServerConfig 的核心配置信息。
2021-10-19 20:08:49 INFO main - rocketmqHome=D:\Java\RocketMQ\NameServer
2021-10-19 20:08:49 INFO main - kvConfigPath=C:\Users\hncboy\namesrv\kvConfig.json
2021-10-19 20:08:49 INFO main - configStorePath=C:\Users\hncboy\namesrv\namesrv.properties
2021-10-19 20:08:49 INFO main - productEnvName=center
2021-10-19 20:08:49 INFO main - clusterTest=false
2021-10-19 20:08:49 INFO main - orderMessageEnable=false
2021-10-19 20:08:49 INFO main - listenPort=9876
2021-10-19 20:08:49 INFO main - serverWorkerThreads=8
2021-10-19 20:08:49 INFO main - serverCallbackExecutorThreads=0
2021-10-19 20:08:49 INFO main - serverSelectorThreads=3
2021-10-19 20:08:49 INFO main - serverOnewaySemaphoreValue=256
2021-10-19 20:08:49 INFO main - serverAsyncSemaphoreValue=64
2021-10-19 20:08:49 INFO main - serverChannelMaxIdleTimeSeconds=120
2021-10-19 20:08:49 INFO main - serverSocketSndBufSize=65535
2021-10-19 20:08:49 INFO main - serverSocketRcvBufSize=65535
2021-10-19 20:08:49 INFO main - serverPooledByteBufAllocatorEnable=true
2021-10-19 20:08:49 INFO main - useEpollNativeSelector=false
2021-10-19 20:08:51 INFO main - Server is running in TLS permissive mode
2021-10-19 20:08:51 INFO main - Tls config file doesn't exist, skip it
2021-10-19 20:08:51 INFO main - Log the final used tls related configuration
2021-10-19 20:08:51 INFO main - tls.test.mode.enable = true
2021-10-19 20:08:51 INFO main - tls.server.need.client.auth = none
2021-10-19 20:08:51 INFO main - tls.server.keyPath = null
2021-10-19 20:08:51 INFO main - tls.server.keyPassword = null
2021-10-19 20:08:52 INFO main - tls.server.certPath = null
2021-10-19 20:08:52 INFO main - tls.server.authClient = false
2021-10-19 20:08:52 INFO main - tls.server.trustCertPath = null
2021-10-19 20:08:52 INFO main - tls.client.keyPath = null
2021-10-19 20:08:52 INFO main - tls.client.keyPassword = null
2021-10-19 20:08:52 INFO main - tls.client.certPath = null
2021-10-19 20:08:52 INFO main - tls.client.authServer = false
2021-10-19 20:08:52 INFO main - tls.client.trustCertPath = null
2021-10-19 20:08:52 INFO main - Using JDK SSL provider
2021-10-19 20:08:54 INFO main - SSLContext created for server
2021-10-19 20:08:56 INFO main - Try to start service thread:FileWatchService started:false lastThread:null
2021-10-19 20:08:56 INFO FileWatchService - FileWatchService service started
2021-10-19 20:08:56 INFO NettyEventExecutor - NettyEventExecutor service started
2021-10-19 20:08:56 INFO main - The Name Server boot success. serializeType=JSON
创建 NamesrcController
createNamesrvController 方法的最后一步了,就是直接传入 namesrvConfig 和 nettyServerConfig 两个核心配置类,然后 new 了一个 NamesrvController。
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 保存配置信息
controller.getConfiguration().registerConfig(properties);
return controller;
}
看下图的图示,可以看到箭头的指向,两个核心配置类在初始化完毕后,都是交给 NamesrvController 这个核心组件的。
来看下 NamesrvController 的构造方法,其中基本就是给一堆变量赋值,构造了一些对象,如 KVConfigManager、RouteInfoManager
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
我们了解到,NamesrvConfig 配置里其实没啥东西,最主要的还是 NettyServerConfig 里包含的一些网络通信的参数,它们都有一些默认值。此时虽然创建出了 NamesrvController 对象,但是最核心的 Netty 服务的启动还没有开始,只有 Netty 服务启动了,NameServer 才能在 9876 端口上接收 Broker 和客户端的网络请求,比如 Broker 注册自己,客户端拉取 Broker 路由数据等等。
start 方法
通过上述的 createNamesrcController 方法,创建出来了一个 NamesrcController 对象,但是仅仅创建这么一个对象,肯定是不够的,后续肯定要启动这个 NamesrcController 对象。接着回到 main0 方法中。
public static NamesrvController main0(String[] args) {
try {
NamesrvController controller = createNamesrvController(args);
start(controller);
String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
log.info(tip);
System.out.printf("%s%n", tip);
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}
在上方的 main0 方法中,当 NamesrcController 对象被创建出来后,调用了 start 方法取启动。接下来去看 start 这个方法干了什么。
public static NamesrvController start(final NamesrvController controller) throws Exception {
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// NamesrvController 初始化
boolean initResult = controller.initialize();
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 通过 Runtime 类注册了一个 JVM 关闭时候的 shutdown 钩子,就是 JVM 关闭时会执行上述注册的回调函数
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动 NamesrvController
controller.start();
return controller;
}
initialize 方法
先来看下 controller.initialize();
方法,该方法就是对 NamesrvController 执行了 initialize 初始化操作。看下 NamesrvController 类的 initialize 方法代码。
public boolean initialize() {
// 加载 kv 配置
this.kvConfigManager.load();
// 初始化 Netty 服务器
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 初始化 Netty 服务器的工作线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 将工作线程池赋给 Netty 服务器
this.registerProcessor();
// 启动了后台线程执行定任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔 10s 检测哪些 Broker 没发送心跳
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 启动了后台线程执行定任务
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 每隔 10s 打印 kv 配置信息
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// FileWatch 相关的,目前不了解
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
其中一行关键代码:this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
,构建了一个 NettyRemotingServer 对象,也就是 Netty 网络服务器。此时关键的部分已经看到了,补充下上方的图。
初始化 NettyRemotingServer
接着就进到 NettyRemotingServer 构造方法中看下 Netty 服务器是怎样被初始化的。
public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
final ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
this.serverBootstrap = new ServerBootstrap();
... 省略
}
构造方法很长,截取最关键的一部分,上面的代码中最关键的一行代码是:this.serverBootstrap = new ServerBootstrap();
,这个 ServerBootstrap 类,是 Netty 里核心的一个类,代表了一个 Netty 网络服务器,通过这个东西,最终可以让 Netty 监听一个端口上的网络请求。此时再去完善下上面的图,NettyRemotingServer 是一个 Rocket MQ 自己开发的网络服务器组件,但是底层就是基于 Netty 的原始 API 实现的要给 NettyBootstrap,是用作真正的网络服务器的。
shutdown 方法
public void shutdown() {
this.remotingServer.shutdown();
this.remotingExecutor.shutdown();
this.scheduledExecutorService.shutdown();
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
}
在 JVM 关闭时会执行 shutdown 方法。其中就是关闭 NettyRemotingServer 释放网络资源,然后关闭 RemotingExecutor 就是释放 Netty 服务器工作线程池的资源,还有关闭 scheduledExecutorService 就是释放定时任务后台线程资源。
NamesrvController-start 方法
start 方法中的 controller.start();
方法是一行关键代码,之前的代码已经把 Netty 服务器初始化完毕了,但是还没有启动,没启动的话什么都干不了,此时就要执行一个启动方法,进入 NamesrvController 内部的 start 方法。
public void start() throws Exception {
this.remotingServer.start();
if (this.fileWatchService != null) {
this.fileWatchService.start();
}
}
这里很清晰了,这个 NamesrvController 启动,核心就是在启动 NettyRemotingServer,也就是 Netty 服务器,再进到这个 remotingServer.start() 方法里查看下代码。
NettyRemotingServer-start 方法
@Override
public void start() {
// // 初始化了线程池组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
// 基于 ServerBootstrap 的 group 方法,对 Netty 服务器进行各种网络上的配置
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 设置需要监听的端口号 9876
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 绑定并监听一个端口号启动 Netty 服务器
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
当 Netty 服务器启动成功后,再来看下完善后的图。