环境准备
基本yum源
yum install -y epel-release
yum install -y wget bash-com* git
yum update -y
yum -y install gcc bc gcc-c++ ncurses ncurses-devel cmake elfutils-libelf-devel openssl-devel flex* bison* autoconf automake zlib* fiex* libxml* ncurses-devel libmcrypt* libtool-ltdl-devel* make cmake pcre pcre-devel openssl openssl-devel jemalloc-devel tlc libtool vim unzip wget lrzsz bash-comp* ipvsadm ipset jq sysstat conntrack libseccomp conntrack-tools socat curl wget git conntrack-tools psmisc nfs-utils tree bash-completion conntrack libseccomp net-tools crontabs sysstat iftop nload strace bind-utils tcpdump htop telnet lsof
关闭防火墙,swap,selinux
#关闭防火墙
systemctl disable --now firewalld
#关闭swap
swapoff -a
sed -ri '/^[^#]*swap/s@^@#@' /etc/fstab
#关闭selinux
setenforce 0
sed -ri '/^[^#]*SELINUX=/s#=.+$#=disabled#' /etc/selinux/config
主机
hostname | ip | 安装组件 |
---|---|---|
master01 | 10.140.0.18 | nameserver, 主broker |
master02 | 10.140.0.19 | nameserver, 主broker |
slave01 | 10.140.0.20 | 从broker |
slave02 | 10.140.0.21 | 从broker |
安装jdk 1.8
yum -y install java-1.8.0-openjdk java-1.8.0-openjdk-devel
RocketMQ 角色介绍
Producer
:消息生产者,负责生产消息,一般由业务系统负责生产消息。Consumer
:消息消费者,负责消费消息,一般是后台系统负责异步消费, 以下为RocketMQ不同Consumer简介。Push Consumer
:向Consumer对象注册一个监听器,监听推送来的消息,进行消费。Pull Consumer
:主动请求Broker拉取消息,然后对消息消费。
Producer Group
:生产者集合,一般用于发送同一类消息。Consumer Group
:消费者集合,一般用于接收一类消息进行消费。Broker
:MQ消息服务器,用于消息存储和转发。NameServer
:Broker协调者,类似Zookeeper的功能。Topic
:消息的主题,发送者可以向多个Topic发送消息,消费者也可以向多个Topic订阅消息。Message Queue
:在Topic
内部的小类的分区,可以进行发送和接收的处理。
集群工作流程
- 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
下载mq
wget https://mirror-hk.koddos.net/apache/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
unzip rocketmq-all-4.7.1-bin-release.zip
mv rocketmq-all-4.7.1-bin-release /usr/local/rocketmq
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
修改配置文件
master01
vim /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=10.140.0.18:9876;10.140.0.19:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=4194304
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
slave01
vim /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=10.140.0.18:9876;10.140.0.19:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=4194304
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
master02
vim /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=10.140.0.18:9876;10.140.0.19:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=4194304
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
slave02
vim /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=10.140.0.18:9876;10.140.0.19:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=4194304
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
修改jvm
默认nameserver需要4g内存,broker需要8g内存,所以一台机器起码16G内存才行。
更改启动脚本runbroker.sh和runserver.sh的配置,主要修改JAVA.OPT
vim /usr/local/rocketmq/bin/runserver.sh
vim /usr/local/rocketmq/bin/runbroker.sh
启动集群
启动nameserver
cd /usr/local/rocketmq/bin
nohup ./mqnamesrv &
启动broker
master01
cd /usr/local/rocketmq/bin
nohup ./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
slave01
cd /usr/local/rocketmq/bin
nohup ./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
master02
cd /usr/local/rocketmq/bin
nohup ./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
slave02
cd /usr/local/rocketmq/bin
nohup ./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
集群监控平台搭建
在master01上操作。
概述: RocketMQ
有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console
,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console
进行编译打包运行。
git clone https://github.com/apache/rocketmq-externals.git
打包前在rocketmq-console
中配置namesrv
集群地址:
vim rocketmq-externals/rocketmq-console/src/main/resources/application.properties
rocketmq.config.namesrvAddr=10.140.0.18:9876;10.140.0.19:9876
编译并打包
wget https://apache.website-solution.net/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
tar xvf apache-maven-3.6.3-bin.tar.gz
mv apache-maven-3.6.3 /usr/local/maven
echo 'export MAVEN_HOME=/usr/local/maven' >> /etc/profile
echo 'export PATH=$PATH:${MAVEN_HOME}/bin' >> /etc/profile
source /etc/profile
cd rocketmq-externals/rocketmq-console/
mvn clean package -Dmaven.test.skip=true
启动rocketmq-console:
cd target
nohup java -jar rocketmq-console-ng-2.0.0.jar &
访问http://localhost:8080
验证RocketMQ功能正常
验证生产消息正常
cd /usr/local/rocketmq/
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
能看到类似如下输出即成功
SendResult [sendStatus=SEND_OK, msgId=C0A82BC5F36C511D50C05B41...
验证消费消息正常
cd /usr/local/rocketmq/
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
能看到类似如下输出即成功
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=3....