一、环境搭建
1.1 环境要求
- JDK 1.8
- maven
-
1.2 源码下载
git clone https://github.com/apache/rocketmq.git
目录结构
broker: broker 模块(broke 启动进程)
- client :消息客户端,包含消息生产者、消息消费者相关类
- common :公共包
- dev :开发者信息(非源代码)
- distribution :部署实例文件夹(非源代码)
- example: RocketMQ 例代码
- filter :消息过滤相关基础类
- filtersrv:消息过滤服务器实现相关类(Filter启动进程)
- logappender:日志实现相关类
- namesrv:NameServer实现相关类(NameServer启动进程)
- openmessageing:消息开放标准
- remoting:远程通信模块,给予Netty
- srcutil:服务工具类
- store:消息存储实现相关类
- style:checkstyle相关实现
- test:测试相关类
- tools:工具类,监控命令相关实现类
使用idea打开项目
进行安装,在命令行运行如下命令
mvn clean package -DskipTests=true
1.3 调试准备
创建conf文件夹,从distribution下的conf目录下,复制 broker.conf、logback_broker.xml、logback_namesrv.xml 三个文件到新建的conf目录下,最终效果如图所示。
启动Namesrv
首先,先添加环境变量,如下图,添加名为 ROCKETMQ_HOME 的环境变量,值为当前的项目路径
如果不做如上的操作,运行NamesrvStartup的话,会报如下的错误
添加完环境变量之后,运行NamesrvStartup,看到控制台打印如下结果,即Namesrv运行成功The Name Server boot success. serializeType=JSON
启动Broker
首先,BrokerStartup中,也要像NamesrvStartup一样,添加ROCKETMQ_HOME这个环境变量,并且,需要指定broker.conf所在的位置。
然后再修改 调试准备阶段中,复制到conf目录下的broker.conf配置文件,完整的broker.conf内容如下。 ```shell brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH
namesrvAddr=127.0.0.1:9876 authCreateTopicEnable = true #开启自动创建topic的功能 storePathRootDir=/home/ifan/workspace/github-source/rocketmq/data enablePropertyFilter=true storePathCommitLog=/home/ifan/workspace/github-source/rocketmq/data/commitlog
看到如下的打印,证明broker启动成功
```shell
The broker[broker-a, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
1.4 源码调试
生产者测试
使用rocketmq自带的测试用例进行测试(example Modules下的org.apache.rocketmq.example.quickstart.Producer),在其中添加namesrv的地址,运行main方法
消费者测试
使用rocketmq自带的测试用例进行测试(example Modules下的org.apache.rocketmq.example.quickstart.Consumer),在其中添加namesrv的地址,运行main方法
到这,源码的环境搭建就完成了
二、源码解析
2.1 Namesrv 源码分析
NamesrvStartup#main -> 调用main0
NamesrvStartup#main0
createNamesrvController(args) 第一步,创建NamesrvController
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
// 1. 设置 ROCKETMQ 的服务端版本
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
// 创建一个命令行的选项,用于mqnamesrv命令的一些选项输出
// 在方法内部,创建出 -h -n -c -p的选项
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
if (null == commandLine) {
System.exit(-1);
return null;
}
// 创建一个Namesrv的配置类
final NamesrvConfig namesrvConfig = new NamesrvConfig();
// 创建一个Netty服务端的配置
final NettyServerConfig nettyServerConfig = new NettyServerConfig();
// 设置Netty服务端的监听端口
nettyServerConfig.setListenPort(9876);
// 如果启动参数中,包含 -c参数,说明指定了配置文件
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c');
if (file != null) {
// 指定了-c参数,就通过 InputStream读取这个配置文件的内容
InputStream in = new BufferedInputStream(new FileInputStream(file));
// 并将这个内容,存储到一个Properties文件中
properties = new Properties();
properties.load(in);
// 然后,通过反射,将properties文件中的内容,存储到上面创建的两个配置类中
MixAll.properties2Object(properties, namesrvConfig);
MixAll.properties2Object(properties, nettyServerConfig);
// 设置配置文件的路径
namesrvConfig.setConfigStorePath(file);
// 然后关闭文件流
System.out.printf("load config properties file OK, %s%n", file);
in.close();
}
}
// 有 -p 参数,打印可配置的参数,到console中,然后System.exit退出程序
if (commandLine.hasOption('p')) {
InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
MixAll.printObjectProperties(console, namesrvConfig);
MixAll.printObjectProperties(console, nettyServerConfig);
System.exit(0);
}
// 将命令行中的参数,设置到namesrvconfig中
MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
// 如果没有设置 ROCKETMQ_HOME 这个环境变量的话,就在这打印日志,然后报错异常退出
if (null == namesrvConfig.getRocketmqHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
System.exit(-2);
}
// 获取日志上下文
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
// 设置日志的配置文件所在地
configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
// 获取日志对象
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
// 将这个配置,打印在日志文件中
MixAll.printObjectProperties(log, namesrvConfig);
MixAll.printObjectProperties(log, nettyServerConfig);
// 初始化NamesrvController,将两个配置类传入
final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// 将properties中的内容,以及Configuration中的内容进行合并,方法内部调用 merge
controller.getConfiguration().registerConfig(properties);
return controller;
}
new NamesrvController();
将传入的配置类啊,NamesrvController中的一些属性进行赋值以及创建
public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
this.namesrvConfig = namesrvConfig;
this.nettyServerConfig = nettyServerConfig;
this.kvConfigManager = new KVConfigManager(this);
this.routeInfoManager = new RouteInfoManager();
this.brokerHousekeepingService = new BrokerHousekeepingService(this);
this.configuration = new Configuration(
log,
this.namesrvConfig, this.nettyServerConfig
);
this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
}
start(controller) 第二步,启动NamesrvController
public static NamesrvController start(final NamesrvController controller) throws Exception {
// 如果Controller创建失败了,则抛出异常
if (null == controller) {
throw new IllegalArgumentException("NamesrvController is null");
}
// 初始化NamesrvController,详见下文
boolean initResult = controller.initialize();
// 如果初始化失败了,则调用shutdown进行收尾
if (!initResult) {
controller.shutdown();
System.exit(-3);
}
// 注册一个Hook,在Java程序关闭的之后,将会执行这个钩子函数,执行controller的shutdown方法
// 进行收尾
Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
@Override
public Void call() throws Exception {
controller.shutdown();
return null;
}
}));
// 启动NamesrvController
controller.start();
return controller;
}
controller.initialize()
public boolean initialize() {
// 加载 ${user.home}/namesrv/kvConfig.json中的内容,
// 将其读取出来,最终序列号保存到configTable属性中
this.kvConfigManager.load();
// 创建Netty远程服务器,创建出Netty中的BossGroup以及WorkerGroup,
// 还有ServerBootstrap这些,但是,还没有启动Server监听端口
this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
// 创建一个线程池
this.remotingExecutor =
Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
// 注册处理器,内部根据是否集群,创建出不同的处理器,
// 集群:ClusterTestRequestProcessor
// 非集群:DefaultRequestProcessor
this.registerProcessor();
// 开启一个定时的线程池,定时扫描不活跃的Broker(scanNotActiveBroker)
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.routeInfoManager.scanNotActiveBroker();
}
}, 5, 10, TimeUnit.SECONDS);
// 开启一个线程池,定时打印一些东西
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NamesrvController.this.kvConfigManager.printAllPeriodically();
}
}, 1, 10, TimeUnit.MINUTES);
// 这是监测SSL的变化
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
fileWatchService = new FileWatchService(
new String[] {
TlsSystemConfig.tlsServerCertPath,
TlsSystemConfig.tlsServerKeyPath,
TlsSystemConfig.tlsServerTrustCertPath
},
new FileWatchService.Listener() {
boolean certChanged, keyChanged = false;
@Override
public void onChanged(String path) {
if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
log.info("The trust certificate changed, reload the ssl context");
reloadServerSslContext();
}
if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
certChanged = true;
}
if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
keyChanged = true;
}
if (certChanged && keyChanged) {
log.info("The certificate and private key changed, reload the ssl context");
certChanged = keyChanged = false;
reloadServerSslContext();
}
}
private void reloadServerSslContext() {
((NettyRemotingServer) remotingServer).loadSslContext();
}
});
} catch (Exception e) {
log.warn("FileWatchService created error, can't load the certificate dynamically");
}
}
return true;
}
controller.start()
// 启动NettyRemotingServer,remotingServer.start()的详细,详见附录
this.remotingServer.start();
if (this.fileWatchService != null) {
// 如果需要监听SSL的变化的话,启动这个线程
this.fileWatchService.start();
}
打印日志,NamesrvController启动成功
Netty NamesrvController的分析到这。。。
附录:
附录一:remotingServer.start()
@Override
public void start() {
// 创建一个线程组
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
// 预先创建好编解码,以及Netty服务端的处理器
/**
* handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
* encoder = new NettyEncoder();
* connectionManageHandler = new NettyConnectManageHandler();
* serverHandler = new NettyServerHandler();
*/
prepareSharableHandlers();
// channel 设置Netty的通道实现类型
// option 设置Netty的参数
// localAddress 设置Netty的本地地址
// childHandler 设置Netty workerGroup的处理器链
// 其中,在这,添加了如下的几种: 1. 名字:handshakeHandler,握手处理器
// 2. NettyDecoder 实现了 LengthFieldBasedFrameDecoder,通过协议上的开头包长度,来拼包拆包
// 3. IdleStateHandler 心跳包
// 4. NettyConnectManageHandler 链接管理处理器(ChannelDuplexHandler 双向处理器)
// 5. NettyServerHandler -> processMessageReceived
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
// 启动Netty
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
// 定时器、
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}