一、环境搭建

1.1 环境要求

  • JDK 1.8
  • maven
  • IDEA

    1.2 源码下载

    1. git clone https://github.com/apache/rocketmq.git

    目录结构

  • broker: broker 模块(broke 启动进程)

  • client :消息客户端,包含消息生产者、消息消费者相关类
  • common :公共包
  • dev :开发者信息(非源代码)
  • distribution :部署实例文件夹(非源代码)
  • example: RocketMQ 例代码
  • filter :消息过滤相关基础类
  • filtersrv:消息过滤服务器实现相关类(Filter启动进程)
  • logappender:日志实现相关类
  • namesrv:NameServer实现相关类(NameServer启动进程)
  • openmessageing:消息开放标准
  • remoting:远程通信模块,给予Netty
  • srcutil:服务工具类
  • store:消息存储实现相关类
  • style:checkstyle相关实现
  • test:测试相关类
  • tools:工具类,监控命令相关实现类

    使用idea打开项目

    image.png

    进行安装,在命令行运行如下命令

    1. mvn clean package -DskipTests=true

    1.3 调试准备

    创建conf文件夹,从distribution下的conf目录下,复制 broker.conf、logback_broker.xml、logback_namesrv.xml 三个文件到新建的conf目录下,最终效果如图所示。
    image.png

    启动Namesrv

    首先,先添加环境变量,如下图,添加名为 ROCKETMQ_HOME 的环境变量,值为当前的项目路径
    image.png
    如果不做如上的操作,运行NamesrvStartup的话,会报如下的错误
    image.png
    添加完环境变量之后,运行NamesrvStartup,看到控制台打印如下结果,即Namesrv运行成功
    1. The Name Server boot success. serializeType=JSON

    启动Broker

    首先,BrokerStartup中,也要像NamesrvStartup一样,添加ROCKETMQ_HOME这个环境变量,并且,需要指定broker.conf所在的位置。
    image.png
    然后再修改 调试准备阶段中,复制到conf目录下的broker.conf配置文件,完整的broker.conf内容如下。 ```shell brokerClusterName = DefaultCluster brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH

namesrvAddr=127.0.0.1:9876 authCreateTopicEnable = true #开启自动创建topic的功能 storePathRootDir=/home/ifan/workspace/github-source/rocketmq/data enablePropertyFilter=true storePathCommitLog=/home/ifan/workspace/github-source/rocketmq/data/commitlog

  1. 看到如下的打印,证明broker启动成功
  2. ```shell
  3. The broker[broker-a, 172.17.0.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876

1.4 源码调试

生产者测试

使用rocketmq自带的测试用例进行测试(example Modules下的org.apache.rocketmq.example.quickstart.Producer),在其中添加namesrv的地址,运行main方法

消费者测试

使用rocketmq自带的测试用例进行测试(example Modules下的org.apache.rocketmq.example.quickstart.Consumer),在其中添加namesrv的地址,运行main方法

到这,源码的环境搭建就完成了

二、源码解析

2.1 Namesrv 源码分析

NamesrvStartup#main -> 调用main0

NamesrvStartup#main0

createNamesrvController(args) 第一步,创建NamesrvController

  1. public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {
  2. // 1. 设置 ROCKETMQ 的服务端版本
  3. System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
  4. // 创建一个命令行的选项,用于mqnamesrv命令的一些选项输出
  5. // 在方法内部,创建出 -h -n -c -p的选项
  6. Options options = ServerUtil.buildCommandlineOptions(new Options());
  7. commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());
  8. if (null == commandLine) {
  9. System.exit(-1);
  10. return null;
  11. }
  12. // 创建一个Namesrv的配置类
  13. final NamesrvConfig namesrvConfig = new NamesrvConfig();
  14. // 创建一个Netty服务端的配置
  15. final NettyServerConfig nettyServerConfig = new NettyServerConfig();
  16. // 设置Netty服务端的监听端口
  17. nettyServerConfig.setListenPort(9876);
  18. // 如果启动参数中,包含 -c参数,说明指定了配置文件
  19. if (commandLine.hasOption('c')) {
  20. String file = commandLine.getOptionValue('c');
  21. if (file != null) {
  22. // 指定了-c参数,就通过 InputStream读取这个配置文件的内容
  23. InputStream in = new BufferedInputStream(new FileInputStream(file));
  24. // 并将这个内容,存储到一个Properties文件中
  25. properties = new Properties();
  26. properties.load(in);
  27. // 然后,通过反射,将properties文件中的内容,存储到上面创建的两个配置类中
  28. MixAll.properties2Object(properties, namesrvConfig);
  29. MixAll.properties2Object(properties, nettyServerConfig);
  30. // 设置配置文件的路径
  31. namesrvConfig.setConfigStorePath(file);
  32. // 然后关闭文件流
  33. System.out.printf("load config properties file OK, %s%n", file);
  34. in.close();
  35. }
  36. }
  37. // 有 -p 参数,打印可配置的参数,到console中,然后System.exit退出程序
  38. if (commandLine.hasOption('p')) {
  39. InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);
  40. MixAll.printObjectProperties(console, namesrvConfig);
  41. MixAll.printObjectProperties(console, nettyServerConfig);
  42. System.exit(0);
  43. }
  44. // 将命令行中的参数,设置到namesrvconfig中
  45. MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
  46. // 如果没有设置 ROCKETMQ_HOME 这个环境变量的话,就在这打印日志,然后报错异常退出
  47. if (null == namesrvConfig.getRocketmqHome()) {
  48. System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);
  49. System.exit(-2);
  50. }
  51. // 获取日志上下文
  52. LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
  53. JoranConfigurator configurator = new JoranConfigurator();
  54. configurator.setContext(lc);
  55. lc.reset();
  56. // 设置日志的配置文件所在地
  57. configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
  58. // 获取日志对象
  59. log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
  60. // 将这个配置,打印在日志文件中
  61. MixAll.printObjectProperties(log, namesrvConfig);
  62. MixAll.printObjectProperties(log, nettyServerConfig);
  63. // 初始化NamesrvController,将两个配置类传入
  64. final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
  65. // 将properties中的内容,以及Configuration中的内容进行合并,方法内部调用 merge
  66. controller.getConfiguration().registerConfig(properties);
  67. return controller;
  68. }

new NamesrvController();

将传入的配置类啊,NamesrvController中的一些属性进行赋值以及创建

  1. public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
  2. this.namesrvConfig = namesrvConfig;
  3. this.nettyServerConfig = nettyServerConfig;
  4. this.kvConfigManager = new KVConfigManager(this);
  5. this.routeInfoManager = new RouteInfoManager();
  6. this.brokerHousekeepingService = new BrokerHousekeepingService(this);
  7. this.configuration = new Configuration(
  8. log,
  9. this.namesrvConfig, this.nettyServerConfig
  10. );
  11. this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
  12. }

start(controller) 第二步,启动NamesrvController

  1. public static NamesrvController start(final NamesrvController controller) throws Exception {
  2. // 如果Controller创建失败了,则抛出异常
  3. if (null == controller) {
  4. throw new IllegalArgumentException("NamesrvController is null");
  5. }
  6. // 初始化NamesrvController,详见下文
  7. boolean initResult = controller.initialize();
  8. // 如果初始化失败了,则调用shutdown进行收尾
  9. if (!initResult) {
  10. controller.shutdown();
  11. System.exit(-3);
  12. }
  13. // 注册一个Hook,在Java程序关闭的之后,将会执行这个钩子函数,执行controller的shutdown方法
  14. // 进行收尾
  15. Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {
  16. @Override
  17. public Void call() throws Exception {
  18. controller.shutdown();
  19. return null;
  20. }
  21. }));
  22. // 启动NamesrvController
  23. controller.start();
  24. return controller;
  25. }

controller.initialize()
  1. public boolean initialize() {
  2. // 加载 ${user.home}/namesrv/kvConfig.json中的内容,
  3. // 将其读取出来,最终序列号保存到configTable属性中
  4. this.kvConfigManager.load();
  5. // 创建Netty远程服务器,创建出Netty中的BossGroup以及WorkerGroup,
  6. // 还有ServerBootstrap这些,但是,还没有启动Server监听端口
  7. this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
  8. // 创建一个线程池
  9. this.remotingExecutor =
  10. Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
  11. // 注册处理器,内部根据是否集群,创建出不同的处理器,
  12. // 集群:ClusterTestRequestProcessor
  13. // 非集群:DefaultRequestProcessor
  14. this.registerProcessor();
  15. // 开启一个定时的线程池,定时扫描不活跃的Broker(scanNotActiveBroker)
  16. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  17. @Override
  18. public void run() {
  19. NamesrvController.this.routeInfoManager.scanNotActiveBroker();
  20. }
  21. }, 5, 10, TimeUnit.SECONDS);
  22. // 开启一个线程池,定时打印一些东西
  23. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  24. @Override
  25. public void run() {
  26. NamesrvController.this.kvConfigManager.printAllPeriodically();
  27. }
  28. }, 1, 10, TimeUnit.MINUTES);
  29. // 这是监测SSL的变化
  30. if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
  31. // Register a listener to reload SslContext
  32. try {
  33. fileWatchService = new FileWatchService(
  34. new String[] {
  35. TlsSystemConfig.tlsServerCertPath,
  36. TlsSystemConfig.tlsServerKeyPath,
  37. TlsSystemConfig.tlsServerTrustCertPath
  38. },
  39. new FileWatchService.Listener() {
  40. boolean certChanged, keyChanged = false;
  41. @Override
  42. public void onChanged(String path) {
  43. if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
  44. log.info("The trust certificate changed, reload the ssl context");
  45. reloadServerSslContext();
  46. }
  47. if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
  48. certChanged = true;
  49. }
  50. if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
  51. keyChanged = true;
  52. }
  53. if (certChanged && keyChanged) {
  54. log.info("The certificate and private key changed, reload the ssl context");
  55. certChanged = keyChanged = false;
  56. reloadServerSslContext();
  57. }
  58. }
  59. private void reloadServerSslContext() {
  60. ((NettyRemotingServer) remotingServer).loadSslContext();
  61. }
  62. });
  63. } catch (Exception e) {
  64. log.warn("FileWatchService created error, can't load the certificate dynamically");
  65. }
  66. }
  67. return true;
  68. }

controller.start()
  1. // 启动NettyRemotingServer,remotingServer.start()的详细,详见附录
  2. this.remotingServer.start();
  3. if (this.fileWatchService != null) {
  4. // 如果需要监听SSL的变化的话,启动这个线程
  5. this.fileWatchService.start();
  6. }

打印日志,NamesrvController启动成功

Netty NamesrvController的分析到这。。。

附录:

附录一:remotingServer.start()

  1. @Override
  2. public void start() {
  3. // 创建一个线程组
  4. this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
  5. nettyServerConfig.getServerWorkerThreads(),
  6. new ThreadFactory() {
  7. private AtomicInteger threadIndex = new AtomicInteger(0);
  8. @Override
  9. public Thread newThread(Runnable r) {
  10. return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
  11. }
  12. });
  13. // 预先创建好编解码,以及Netty服务端的处理器
  14. /**
  15. * handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
  16. * encoder = new NettyEncoder();
  17. * connectionManageHandler = new NettyConnectManageHandler();
  18. * serverHandler = new NettyServerHandler();
  19. */
  20. prepareSharableHandlers();
  21. // channel 设置Netty的通道实现类型
  22. // option 设置Netty的参数
  23. // localAddress 设置Netty的本地地址
  24. // childHandler 设置Netty workerGroup的处理器链
  25. // 其中,在这,添加了如下的几种: 1. 名字:handshakeHandler,握手处理器
  26. // 2. NettyDecoder 实现了 LengthFieldBasedFrameDecoder,通过协议上的开头包长度,来拼包拆包
  27. // 3. IdleStateHandler 心跳包
  28. // 4. NettyConnectManageHandler 链接管理处理器(ChannelDuplexHandler 双向处理器)
  29. // 5. NettyServerHandler -> processMessageReceived
  30. ServerBootstrap childHandler =
  31. this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
  32. .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
  33. .option(ChannelOption.SO_BACKLOG, 1024)
  34. .option(ChannelOption.SO_REUSEADDR, true)
  35. .option(ChannelOption.SO_KEEPALIVE, false)
  36. .childOption(ChannelOption.TCP_NODELAY, true)
  37. .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
  38. .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
  39. .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
  40. .childHandler(new ChannelInitializer<SocketChannel>() {
  41. @Override
  42. public void initChannel(SocketChannel ch) throws Exception {
  43. ch.pipeline()
  44. .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
  45. .addLast(defaultEventExecutorGroup,
  46. encoder,
  47. new NettyDecoder(),
  48. new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
  49. connectionManageHandler,
  50. serverHandler
  51. );
  52. }
  53. });
  54. if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
  55. childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
  56. }
  57. try {
  58. // 启动Netty
  59. ChannelFuture sync = this.serverBootstrap.bind().sync();
  60. InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
  61. this.port = addr.getPort();
  62. } catch (InterruptedException e1) {
  63. throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
  64. }
  65. if (this.channelEventListener != null) {
  66. this.nettyEventExecutor.start();
  67. }
  68. // 定时器、
  69. this.timer.scheduleAtFixedRate(new TimerTask() {
  70. @Override
  71. public void run() {
  72. try {
  73. NettyRemotingServer.this.scanResponseTable();
  74. } catch (Throwable e) {
  75. log.error("scanResponseTable exception", e);
  76. }
  77. }
  78. }, 1000 * 3, 1000);
  79. }