前置条件

1.配置jdk1.8 环境变量 ,如果没配置的话参考这个博客去配置: https://blog.csdn.net/qq_41489540/article/details/116399710

2.集群批量执行命令的脚本,看这个博客去整一个:

https://blog.csdn.net/qq_41489540/article/details/109094840

为什么让整这个脚本呢?原因是集群集群很多,如果你一个一个去执行命令安装的话,会累死你的.弄了这个脚本之后就可以实现在一台机器敲完命令之后,集群多台机器都能执行到这个命令.

3.xsync集群同步脚本:

https://blog.csdn.net/qq_41489540/article/details/109094046

搭建计划

这次搭建一个2主2从异步刷盘的集群

机器名 nameServer节点部署 broker节点部署
zjj101 nameserver
zjj102 nameserver broker-a ,broker-b-s
zjj103 nameserver broker-b,brkoer-a-s

说明,broker-a 意思是amaster节点 , brkoer-a-s的意思是a主的slave节点

关闭集群中的防火墙

我用的 xcall脚本, 你也可以自己登录集群中的每一台机器,自己去执行 systemctl stop firewalld.service 关闭

  1. [root@zjj101 /]# xcall systemctl stop firewalld.service
  2. 要执行的命令是systemctl stop firewalld.service
  3. ---------------------zjj101-----------------
  4. ---------------------zjj102-----------------
  5. ---------------------zjj103-----------------
  6. [root@zjj101 /]# xcall firewall-cmd --state
  7. 要执行的命令是firewall-cmd --state
  8. ---------------------zjj101-----------------
  9. not running
  10. ---------------------zjj102-----------------
  11. not running
  12. ---------------------zjj103-----------------
  13. not running
  14. [root@zjj101 /]#

上传rocketMQ安装包到zjj101机器上

我直接在 /root/soft/ 目录下放了一个 rocketmq-all-4.7.1-bin-release.rar ,你们自己自行准备.

或者从下面百度云网盘里面下载
链接: https://pan.baidu.com/s/1xhcPh0Dk18THT1s-ArDGkQ 提取码: ua2c 复制这段内容后打开百度网盘手机App,操作更方便哦

解压

如果没有安装rar包的,参考: https://zjj1994.blog.csdn.net/article/details/120802578 博客

解压

  1. [root@zjj101 soft]# unrar x rocketmq-all-4.7.1-bin-release.rar

集群分发过去

写相对路径,命令: sh xsync soft/

  1. [root@zjj101 ~]# pwd
  2. /root
  3. [root@zjj101 ~]# ls
  4. anaconda-ks.cfg apache-tomcat-8.5.28.tar.gz soft
  5. apache-tomcat-8.5.28 script
  6. [root@zjj101 ~]# sh xsync soft/

查看各个机器复制情况

  1. [root@zjj101 ~]# sh xcall ls
  2. 要执行的命令是ls
  3. ---------------------zjj101-----------------
  4. anaconda-ks.cfg
  5. apache-tomcat-8.5.28
  6. apache-tomcat-8.5.28.tar.gz
  7. script
  8. soft
  9. ---------------------zjj102-----------------
  10. anaconda-ks.cfg
  11. soft
  12. ---------------------zjj103-----------------
  13. anaconda-ks.cfg
  14. apache-tomcat-8.5.28
  15. apache-tomcat-8.5.28.tar.gz
  16. soft
  17. [root@zjj101 ~]#

配置第一组broker-a

配置主节点

在zjj102上先配置borker-a的master节点。先配置2m-2s-async/broker-a.properties

进入 :/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 路径下

  1. [root@zjj102 2m-2s-async]# pwd
  2. /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async
  3. [root@zjj102 2m-2s-async]# vim broker-a.properties

修改 broker-a.properties 文件

  1. #所属集群名字,名字一样的节点就在同一个集群内
  2. brokerClusterName=rocketmq-cluster
  3. #broker名字,名字一样的节点就是一组主从节点。
  4. brokerName=broker-a
  5. #brokerid,0就表示是Master,>0的都是表示 Slave
  6. brokerId=0
  7. #nameServer地址,分号分割
  8. namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
  9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  10. defaultTopicQueueNums=4
  11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  12. autoCreateTopicEnable=true
  13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  14. autoCreateSubscriptionGroup=true
  15. #Broker 对外服务的监听端口
  16. listenPort=10911
  17. #删除文件时间点,默认凌晨 4点
  18. deleteWhen=04
  19. #文件保留时间,默认 48 小时
  20. fileReservedTime=120
  21. #commitLog每个文件的大小默认1G
  22. mapedFileSizeCommitLog=1073741824
  23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  24. mapedFileSizeConsumeQueue=300000
  25. #destroyMapedFileIntervalForcibly=120000
  26. #redeleteHangedFileInterval=120000
  27. #检测物理文件磁盘空间
  28. diskMaxUsedSpaceRatio=88
  29. #存储路径
  30. storePathRootDir=/app/rocketmq/store
  31. #commitLog 存储路径
  32. storePathCommitLog=/app/rocketmq/store/commitlog
  33. #消费队列存储路径存储路径
  34. storePathConsumeQueue=/app/rocketmq/store/consumequeue
  35. #消息索引存储路径
  36. storePathIndex=/app/rocketmq/store/index
  37. #checkpoint 文件存储路径
  38. storeCheckpoint=/app/rocketmq/store/checkpoint
  39. #abort 文件存储路径
  40. abortFile=/app/rocketmq/store/abort
  41. #限制的消息大小
  42. maxMessageSize=65536
  43. #flushCommitLogLeastPages=4
  44. #flushConsumeQueueLeastPages=2
  45. #flushCommitLogThoroughInterval=10000
  46. #flushConsumeQueueThoroughInterval=60000
  47. #Broker 的角色
  48. #- ASYNC_MASTER 异步复制Master
  49. #- SYNC_MASTER 同步双写Master
  50. #- SLAVE
  51. brokerRole=ASYNC_MASTER
  52. #刷盘方式
  53. #- ASYNC_FLUSH 异步刷盘
  54. #- SYNC_FLUSH 同步刷盘
  55. flushDiskType=ASYNC_FLUSH
  56. #checkTransactionMessageEnable=false
  57. #发消息线程池数量
  58. #sendMessageThreadPoolNums=128
  59. #拉消息线程池数量
  60. #pullMessageThreadPoolNums=128

配置从节点

这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties

进入/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下

  1. cd /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async

修改 broker-a-s.properties 文件

  1. #所属集群名字,名字一样的节点就在同一个集群内
  2. brokerClusterName=rocketmq-cluster
  3. #broker名字,名字一样的节点就是一组主从节点。
  4. brokerName=broker-a
  5. #brokerid,0就表示是Master,>0的都是表示 Slave
  6. brokerId=1
  7. #nameServer地址,分号分割
  8. namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
  9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  10. defaultTopicQueueNums=4
  11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  12. autoCreateTopicEnable=true
  13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  14. autoCreateSubscriptionGroup=true
  15. #Broker 对外服务的监听端口
  16. listenPort=11011
  17. #删除文件时间点,默认凌晨 4点
  18. deleteWhen=04
  19. #文件保留时间,默认 48 小时
  20. fileReservedTime=120
  21. #commitLog每个文件的大小默认1G
  22. mapedFileSizeCommitLog=1073741824
  23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  24. mapedFileSizeConsumeQueue=300000
  25. #destroyMapedFileIntervalForcibly=120000
  26. #redeleteHangedFileInterval=120000
  27. #检测物理文件磁盘空间
  28. diskMaxUsedSpaceRatio=88
  29. #存储路径
  30. storePathRootDir=/app/rocketmq/storeSlave
  31. #commitLog 存储路径
  32. storePathCommitLog=/app/rocketmq/storeSlave/commitlog
  33. #消费队列存储路径存储路径
  34. storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
  35. #消息索引存储路径
  36. storePathIndex=/app/rocketmq/storeSlave/index
  37. #checkpoint 文件存储路径
  38. storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
  39. #abort 文件存储路径
  40. abortFile=/app/rocketmq/storeSlave/abort
  41. #限制的消息大小
  42. maxMessageSize=65536
  43. #flushCommitLogLeastPages=4
  44. #flushConsumeQueueLeastPages=2
  45. #flushCommitLogThoroughInterval=10000
  46. #flushConsumeQueueThoroughInterval=60000
  47. #Broker 的角色
  48. #- ASYNC_MASTER 异步复制Master
  49. #- SYNC_MASTER 同步双写Master
  50. #- SLAVE
  51. brokerRole=SLAVE
  52. #刷盘方式
  53. #- ASYNC_FLUSH 异步刷盘
  54. #- SYNC_FLUSH 同步刷盘
  55. flushDiskType=ASYNC_FLUSH
  56. #checkTransactionMessageEnable=false
  57. #发消息线程池数量
  58. #sendMessageThreadPoolNums=128
  59. #拉消息线程池数量
  60. #pullMessageThreadPoolNums=128

配置第二组Broker-b

配置主节点

这一组broker的主节点在zjj103上,所以需要配置zjj103上的config/2m-2s-async/broker-
b.properties

在 /root/soft/rocketmq-all-4.7.1-bin-release/conf/2m-2s-async 文件夹下修改 broker-
b.properties

  1. #所属集群名字,名字一样的节点就在同一个集群内
  2. brokerClusterName=rocketmq-cluster
  3. #broker名字,名字一样的节点就是一组主从节点。
  4. brokerName=broker-b
  5. #brokerid,0就表示是Master,>0的都是表示 Slave
  6. brokerId=0
  7. #nameServer地址,分号分割
  8. namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
  9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  10. defaultTopicQueueNums=4
  11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  12. autoCreateTopicEnable=true
  13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  14. autoCreateSubscriptionGroup=true
  15. #Broker 对外服务的监听端口
  16. listenPort=10911
  17. #删除文件时间点,默认凌晨 4点
  18. deleteWhen=04
  19. #文件保留时间,默认 48 小时
  20. fileReservedTime=120
  21. #commitLog每个文件的大小默认1G
  22. mapedFileSizeCommitLog=1073741824
  23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  24. mapedFileSizeConsumeQueue=300000
  25. #destroyMapedFileIntervalForcibly=120000
  26. #redeleteHangedFileInterval=120000
  27. #检测物理文件磁盘空间
  28. diskMaxUsedSpaceRatio=88
  29. #存储路径
  30. storePathRootDir=/app/rocketmq/store
  31. #commitLog 存储路径
  32. storePathCommitLog=/app/rocketmq/store/commitlog
  33. #消费队列存储路径存储路径
  34. storePathConsumeQueue=/app/rocketmq/store/consumequeue
  35. #消息索引存储路径
  36. storePathIndex=/app/rocketmq/store/index
  37. #checkpoint 文件存储路径
  38. storeCheckpoint=/app/rocketmq/store/checkpoint
  39. #abort 文件存储路径
  40. abortFile=/app/rocketmq/store/abort
  41. #限制的消息大小
  42. maxMessageSize=65536
  43. #flushCommitLogLeastPages=4
  44. #flushConsumeQueueLeastPages=2
  45. #flushCommitLogThoroughInterval=10000
  46. #flushConsumeQueueThoroughInterval=60000
  47. #Broker 的角色
  48. #- ASYNC_MASTER 异步复制Master
  49. #- SYNC_MASTER 同步双写Master
  50. #- SLAVE
  51. brokerRole=ASYNC_MASTER
  52. #刷盘方式
  53. #- ASYNC_FLUSH 异步刷盘
  54. #- SYNC_FLUSH 同步刷盘
  55. flushDiskType=ASYNC_FLUSH
  56. #checkTransactionMessageEnable=false
  57. #发消息线程池数量
  58. #sendMessageThreadPoolNums=128
  59. #拉消息线程池数量
  60. #pullMessageThreadPoolNums=128

配置从节点

然后他对应的slave在zjj102上,修改zjj102上的 conf/2m-2s-async/broker-b-s.properties

  1. #所属集群名字,名字一样的节点就在同一个集群内
  2. brokerClusterName=rocketmq-cluster
  3. #broker名字,名字一样的节点就是一组主从节点。
  4. brokerName=broker-b
  5. #brokerid,0就表示是Master,>0的都是表示 Slave
  6. brokerId=1
  7. #nameServer地址,分号分割
  8. namesrvAddr=zjj101:9876;zjj102:9876;zjj103:9876
  9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  10. defaultTopicQueueNums=4
  11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  12. autoCreateTopicEnable=true
  13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  14. autoCreateSubscriptionGroup=true
  15. #Broker 对外服务的监听端口
  16. listenPort=11011
  17. #删除文件时间点,默认凌晨 4点
  18. deleteWhen=04
  19. #文件保留时间,默认 48 小时
  20. fileReservedTime=120
  21. #commitLog每个文件的大小默认1G
  22. mapedFileSizeCommitLog=1073741824
  23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  24. mapedFileSizeConsumeQueue=300000
  25. #destroyMapedFileIntervalForcibly=120000
  26. #redeleteHangedFileInterval=120000
  27. #检测物理文件磁盘空间
  28. diskMaxUsedSpaceRatio=88
  29. #存储路径
  30. storePathRootDir=/app/rocketmq/storeSlave
  31. #commitLog 存储路径
  32. storePathCommitLog=/app/rocketmq/storeSlave/commitlog
  33. #消费队列存储路径存储路径
  34. storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
  35. #消息索引存储路径
  36. storePathIndex=/app/rocketmq/storeSlave/index
  37. #checkpoint 文件存储路径
  38. storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
  39. #abort 文件存储路径
  40. abortFile=/app/rocketmq/storeSlave/abort
  41. #限制的消息大小
  42. maxMessageSize=65536
  43. #flushCommitLogLeastPages=4
  44. #flushConsumeQueueLeastPages=2
  45. #flushCommitLogThoroughInterval=10000
  46. #flushConsumeQueueThoroughInterval=60000
  47. #Broker 的角色
  48. #- ASYNC_MASTER 异步复制Master
  49. #- SYNC_MASTER 同步双写Master
  50. #- SLAVE
  51. brokerRole=SLAVE
  52. #刷盘方式
  53. #- ASYNC_FLUSH 异步刷盘
  54. #- SYNC_FLUSH 同步刷盘
  55. flushDiskType=ASYNC_FLUSH
  56. #checkTransactionMessageEnable=false
  57. #发消息线程池数量
  58. #sendMessageThreadPoolNums=128
  59. #拉消息线程池数量
  60. #pullMessageThreadPoolNums=128

注意点

这样broker就配置完成了。
需要注意的配置项:1、同一机器上两个实例的store目录不能相同,否则会报错 Lock failed,MQ
already started
2、同一机器上两个实例的listenPort也不能相同。否则会报端口占用的错
nameserver不需要进行配置,直接启动就行。这也看出nameserver是无状态的。

配置环境变量

配置 : export ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-release 这行, 路径写你RocketMQ位置

zjj101机器配置:

  1. export PATH="$PATH:/root/script/"# RocketMQexport ROCKETMQ_HOME=/root/soft/rocketmq-all-4.7.1-bin-releaseexport PATH=${ROCKETMQ_HOME}/bin:$PATHexport PATH

配置完了之后集群分发一下 :

在/etc目录下执行 : sh xsync ./profile

  1. [root@zjj101 etc]# sh xsync ./profile要分发的文件的路径是:/etc/profile---------------------zjj102---------------------sending incremental file listprofilesent 640 bytes received 53 bytes 462.00 bytes/sectotal size is 1,941 speedup is 2.80---------------------zjj103---------------------sending incremental file listprofilesent 640 bytes received 53 bytes 1,386.00 bytes/sectotal size is 1,941 speedup is 2.80[root@zjj101 etc]#

集群更新下配置文件 ,命令: sh xcall source /etc/profile

  1. [root@zjj101 etc]# sh xcall source /etc/profile
  2. 要执行的命令是source /etc/profile
  3. ---------------------zjj101-----------------
  4. ---------------------zjj102-----------------
  5. ---------------------zjj103-----------------

验证是否配置成功 :

命令: sh xcall echo $ROCKETMQ_HOME ,看 打印结果 说明配置成功了

  1. [root@zjj101 etc]# sh xcall echo $ROCKETMQ_HOME
  2. 要执行的命令是echo /root/soft/rocketmq-all-4.7.1-bin-release
  3. ---------------------zjj101-----------------
  4. /root/soft/rocketmq-all-4.7.1-bin-release
  5. ---------------------zjj102-----------------
  6. /root/soft/rocketmq-all-4.7.1-bin-release
  7. ---------------------zjj103-----------------
  8. /root/soft/rocketmq-all-4.7.1-bin-release

启动RocketMQ集群

启动就比较简单了,直接调用bin目录下的脚本就行。只是启动之前要注意看下他们的JVM内存配置,默
认的配置都比较高。

先启动nameServer

修改zjj101机器上的runserver.sh配置

修改三个节点上的bin/runserver.sh,调整里面的jvm内存配置。找到下面这一行调整下内存,实际情况根据你们公司的电脑配置来调整,我这里是搞着玩的,所以jvm配置就调整小一些

修改 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh” 文件,JAVA_OPT=”${JAVA_OPT} -server配置,大概是67行的位置

先修改zjj101机器的配置文件

  1. JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

集群分发给别的集群节点上

修改完了保存后,集群分发一下

命令: sh xsync “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh”

  1. [root@zjj101 bin]# sh xsync "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh"
  2. 要分发的文件的路径是:/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh
  3. ---------------------zjj102---------------------
  4. sending incremental file list
  5. runserver.sh
  6. sent 921 bytes received 65 bytes 657.33 bytes/sec
  7. total size is 3,527 speedup is 3.58
  8. ---------------------zjj103---------------------
  9. sending incremental file list
  10. runserver.sh
  11. sent 921 bytes received 65 bytes 1,972.00 bytes/sec
  12. total size is 3,527 speedup is 3.58
  13. [root@zjj101 bin]#

检查分发结果

分发完了不放心的话检查一下 ,用集群查看脚本 查看一下刚刚修改的配置是否分发成功了, 我这里查看内容修改成功了.

  1. [root@zjj101 bin]# sh xcall cat "/root/soft/rocketmq-all-4.7.1-bin-release/bin/runserver.sh"

集群启动nameServer

编写启动脚本

在/root/soft/script/创建一个”startRocketMqNameServer.sh” 文件

内容是:

  1. #!/bin/bash
  2. # 我服务器名字是 zjj101 zjj102 zjj103 这里调整你们自己的服务器名字.
  3. array=(zjj101 zjj102 zjj103)
  4. echo "开始启动RocketMQ集群的NamesrvStartup"
  5. for((i=0;i<${#array[@]};i++))
  6. do
  7. ssh ${array[i]} nohup sh $ROCKETMQ_HOME/bin/mqnamesrv > startNameServer.log 2>&1 &
  8. done

启动nameServer

执行刚刚写的脚本

  1. # 执行集群
  2. [root@zjj101 script]# sh startRocketMqNameServer.sh
  3. 开始启动RocketMQ集群的NamesrvStartup
  4. # 查看是否都启动成功
  5. [root@zjj101 script]# sh xcall jps -l
  6. 要执行的命令是jps -l
  7. ---------------------zjj101-----------------
  8. 78614 org.apache.rocketmq.namesrv.NamesrvStartup
  9. 78686 sun.tools.jps.Jps
  10. ---------------------zjj102-----------------
  11. 15555 org.apache.rocketmq.namesrv.NamesrvStartup
  12. 15577 sun.tools.jps.Jps
  13. ---------------------zjj103-----------------
  14. 7012 org.apache.rocketmq.namesrv.NamesrvStartup
  15. 7034 sun.tools.jps.Jps

再启动broker

修改 runbroker.sh内存

启动broker是使用的mqbroker指令,只是注意启动broker时需要通过-c 指定对应的配置文件。

在zjj102上启动broker-a的master节点和broker-b的slave节点

修改 zjj102 和 zjj103机器上的 “/root/soft/rocketmq-all-4.7.1-bin-release/bin/runbroker.sh” 的jvm启动内存,如果你不修改的话,默认内存设置的非常大,根据你电脑配置去修改

大概是第66行位置,修改 JAVA_OPT=”${JAVA_OPT} -server 配置

  1. JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动

zjj102机器上 cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下

执行下面两个命令

  1. nohup sh mqbroker -c ../conf/2m-2s-async/broker-a.properties > broker-a.log 2>&1 &
  2. nohup sh mqbroker -c ../conf/2m-2s-async/broker-b-s.properties >broker-b-s.log 2>&1&

查看是否执行成功,下面发现有两个BrokerStartup,就说明执行成功了.

  1. [root@zjj102 bin]# jps -l
  2. 20512 org.apache.rocketmq.broker.BrokerStartup
  3. 15555 org.apache.rocketmq.namesrv.NamesrvStartup
  4. 20357 org.apache.rocketmq.broker.BrokerStartup
  5. 21528 sun.tools.jps.Jps

zjj103机器上cd到 /root/soft/rocketmq-all-4.7.1-bin-release/bin 目录下

分别执行下面两个命令

  1. nohup sh mqbroker -c ../conf/2m-2s-async/broker-b.properties > broker-b.log 2>&1 &
  2. nohup sh mqbroker -c ../conf/2m-2s-async/broker-a-s.properties > broker-a-s.log 2>&1 &

查看是否启动成功,有两个 BrokerStartup 说明启动成功了.

  1. [root@zjj103 bin]# jps -l
  2. 8080 sun.tools.jps.Jps
  3. 7012 org.apache.rocketmq.namesrv.NamesrvStartup
  4. 7748 org.apache.rocketmq.broker.BrokerStartup
  5. 7775 org.apache.rocketmq.broker.BrokerStartup

搭建Rocketmq-console可视化插件

这里windows搭建的: 直接看 https://zjj1994.blog.csdn.net/article/details/120809685 博客, 我就不演示了. 很简单

通过Rocketmq-console查看集群情况,

查看到集群搭建成功.

Linux搭建 RocketMQ 2主2从集群 - 图1

执行官方示例测试集群是否好用

说明

在RocketMQ的安装包中,提供了一个tools.sh工具可以用来在命令行快速验证RocketMQ服务。我们在
zjj102上进入RocketMQ的安装目录:
发送消息:默认会发1000条消息

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

接收消息:

  1. bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

注意,这是官方提供的Demo,但是官方的源码中,这两个类都是没有指定nameServer的,所以
运行会有点问题。要指定NameServer地址,可以配置一个环境变量NAMESRV_ADDR,这样默认
会读取这个NameServer地址。可以配到.bash_profile里或者直接临时指定。

  1. export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'

然后就可以正常执行了。
这个NameServer地址的读取方式见源码中
org.apache.rocketmq.common.utils.NameServerAddressUtils

  1. public static String getNameServerAddresses() {
  2. return System.getProperty("rocketmq.namesrv.addr",
  3. System.getenv("NAMESRV_ADDR"));
  4. }

这个方法就是在DefaultMQProducer中默认的设置NameServer地址的方式,这个rokcetmq.namesrv.addr属性可以在java中使用System.setproperties指定,也可以在SpringBoot中配到配置文件里。

这个tools.sh就封装了一个简单的运行RocketMQ的环境,可以运行源码中的其他示例,然后自己
的例子也可以放到RocketMQ的lib目录下去执行。

开始演示

配置环境变量:

修改 /etc/profile 文件

  1. export NAMESRV_ADDR='zjj101:9876;zjj102:9876;zjj103:9876'

配置完了之后,别忘了执行 source /etc/profile 让环境变量生效.

执行 echo $NAMESRV_ADDR 查看环境变量是否配置有问题,能打印出 “zjj101:9876;zjj102:9876;zjj103:9876 “ 说明配置没问题

下面开始演示: 在RocketMq 安装目录下执行 : sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

  1. # 查看配置的环境变量是否生效
  2. [root@zjj102 rocketmq-all-4.7.1-bin-release]# echo $NAMESRV_ADDR
  3. # 说明生效了
  4. zjj101:9876;zjj102:9876;zjj103:9876
  5. # 执行官方示例
  6. [root@zjj102 rocketmq-all-4.7.1-bin-release]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
  7. # 下面开始打印日志了,说明生效了
  8. 12:58:13.013 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
  9. RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
  10. RocketMQLog:WARN Please initialize the logger system properly.
  11. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BCD0000, offsetMsgId=AC100A6600002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
  12. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEB0001, offsetMsgId=AC100A6600002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
  13. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BEF0002, offsetMsgId=AC100A6600002A9F0000000000000196, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
  14. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3BF10003, offsetMsgId=AC100A6700002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=0]
  15. SendResult [sendStatus=SEND_OK, msgId=AC100A665F824DC63996552E3C0F0004, offsetMsgId=AC100A6700002A9F00000000000000CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=0]

点击刷新查看多了个TopicTest,这个是执行官方示例自动生成的.

Linux搭建 RocketMQ 2主2从集群 - 图2

你再打开一个终端连接zjj102,执行下面的命令启动消费者

  1. sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

自己可以反复的启动 示例 , 生产消息和消费消息

点击 TopicTest的consumer管理来查看消费情况

Linux搭建 RocketMQ 2主2从集群 - 图3

下面显示的延迟1000 的意思就是有1000个没消费.你再执行 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer 启动官方消费者示例之后,你就能看到延迟变少了,就说明消费了.

Linux搭建 RocketMQ 2主2从集群 - 图4

下面图就说明都被这个消费者消费了, 延迟为0说明没有消息积压了.

Linux搭建 RocketMQ 2主2从集群 - 图5