什么是消息中间件
消息+中间件
消息:消息即为数据,数据就会有规划,有长度,有大小。
中间件:为我们提供发送消息的程序或者服务.
消息:
java提供了一套标准JMS(java message server)
JMS 消息主体(Body)
JMS提供五种消息主体的形式,每种形式通过消息接口定义:
StreamMessage
消息整体主体包含流式Java原生值,它是连续地被填充和读取的。
MapMessage
消息整体主体包含键值对集合,其中键为字符串,值为Java原生类型。条目访问可被计算器连续地或者名称随机地访问,它的顺序并不一定。
TextMessage
消息整体主体包含一个Java String 对象。
ObjectMessage
消息整体主体包含一个Serializable 对象,如果需要使用集合对象,确保JDK 1.2或更高。
BytesMessage
中间件:
为我们提供发送消息的程序或者服务,目前主流的有rocketMq 、kafka、rabbitMq、activemq等。
为什么要用消息中间件?
为了对应用解耦,实现异步处理;还可以对瞬时高峰进行削峰;可以跨语言
为什么是RocketMq
官网:http://rocketmq.apache.org/
常见的消息中间件:Kafka、ActiveMq、Rocketmq、RabbitMQ
稳定无单点HA
集群功能完善
经历过双十一
Java语言实现
架构轻、源码可读性好(面向过程)
生态圈完善,配套好
开源社区活跃
普通消息、顺序消息、分布式事务消息、定时消息
架构图
消息中间件和RPC最大区别:
Broker Cluster存储
1、环境
192.168.0.31
192.168.0.32
部署模式:2M-2S-SYNC(两主两从同步写)(机器不够用可以不用部署这么多2M-nosalve)
相关安装包存储路径:/root/svr/rocketmq
注意:部署的时候jdk最好用JDK8。
通过本地打包部署的话,要注意了windows和linux系统的转义符不一样,需要对脚本进行更改。
2、部署
2.1 下载
Apache:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
Github:
https://github.com/apache/rocketmq/
2.2 解压:
unzip rocketmq-all-4.2.0-bin-release.zip -d /root/svr/rocketmq
2.3 配置host(给nameServer用):
命令:vim /etc/hosts
192.168.0.12 rocketmq1 192.168.0.13 rocketmq2 |
---|
配置环境变量:vim /etc/profile
export ROCKETMQ_HOME=/root/svr/rocketmq export PATH=$PATH::$ROCKETMQ_HOME/bin |
---|
2.4 配置集群参数:
命令(master):vim /root/svr/rocketmq/conf/2m-2s-sync/broker-a.properties
brokerClusterName=tl-rocketmq-cluster brokerName=broker-a brokerId=0 namesrvAddr=rocketmq1:9876;rocketmq2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=10911 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/root/svr/rocketmq/data/store storePathCommitLog=/root/svr/rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128 brokerRole=SYNC_MASTER flushDiskType=ASYNC_FLUSH |
---|
命令(slave):vim /root/svr/rocketmq/conf/2m-2s-sync/broker-b-s.properties
brokerClusterName=tl-rocketmq-cluster brokerName=broker-a #0 表示 Master,>0 表示 Slave brokerId=1 namesrvAddr=rocketmq1:9876;rocketmq2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true autoCreateSubscriptionGroup=true listenPort=10911 deleteWhen=04 fileReservedTime=120 mapedFileSizeCommitLog=1073741824 mapedFileSizeConsumeQueue=300000 destroyMapedFileIntervalForcibly=120000 redeleteHangedFileInterval=120000 diskMaxUsedSpaceRatio=88 storePathRootDir=/root/svr/rocketmq/data/store storePathCommitLog=/root/svr/rocketmq/data/store/commitlog maxMessageSize=65536 flushCommitLogLeastPages=4 flushConsumeQueueLeastPages=2 flushCommitLogThoroughInterval=10000 flushConsumeQueueThoroughInterval=60000 checkTransactionMessageEnable=false sendMessageThreadPoolNums=128 pullMessageThreadPoolNums=128 #角色 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH |
---|
2.5 创建存储&日志文件
mkdir /root/svr/rocketmq/data (也要创建store文件夹) mkdir/root/svr/rocketmq/data/store/commitlog mkdir/root/svr/rocketmq/data/store/consumequeue mkdir /root/svr/rocketmq/data/store/index |
---|
2.6 修改日志配置文件
mkdir -p /root/svr/rocketmq/logs cd /root/svr/rocketmq/conf && sed-i ‘s#${user.home}#/root/svr/rocketmq#g’ *.xml |
---|
注意logback.*.xml配置文件中${user.home}需要替换自己指定的目录
2.7 改参数
runbroker.sh,runserver.sh启动参数默认对jvm的堆内存设置比较大(不改启动不起来),如果是虚拟机非线上环境需要改下参数,大小可以根据自己机器来决定
默认大小 -Xms8g -Xmx8g -Xmn4g 改为: JAVA_OPT=”${JAVA_OPT} -server -Xms1g-Xmx1g -Xmn512m” |
---|
2.8 复制
scp其他服务器
scp -r rocketmq/root@192.168.0.13:/root/svr/
3、启动服务
3.1启动Nameserver服务
在cd /root/svr/rocketmq/bin目录下执行命令:
nohup sh mqnamesrv & |
---|
3.2启动BrokerServer
在cd /root/svr/rocketmq/bin目录下执行命令:
nohup sh mqbroker -c /root/svr/rocketmq/conf/2m-2s-sync/broker-a.properties & |
---|
3.3是否启动成功
输入命令jps或者查看rocketmq/logs下日志是否输出正常。
安装成功。
查看集群监控状态:
sh mqadmin clusterlist -n 192.168.0.12:9876
监控源码下载地址:
https://github.com/apache/rocketmq-externals.git
配置namesrv
rocketmq.config.namesrvAddr=192.168.0.31:9876
测试:
export NAMESRV_ADDR=rocketmq1:9876;rocketmq2:9876 测试发送端 > sh bin/tools.shorg.apache.rocketmq.example.quickstart.Producer 测试消费端 > sh bin/tools.shorg.apache.rocketmq.example.quickstart.Consumer |
---|
服务停止:
在cd /root/svr/rocketmq/bin目录下执行命令或者jps查看进程号 kill -9pid
sh mqshutdown broker sh mqshutdown namesrv |
---|
常见错误
Jvm参数没有配置启动错误:
# There is insufficient memory for the Java RuntimeEnvironment to continue.
# Native memory allocation (malloc) failed to allocate8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/svr/rocketmq/bin/hs_err_pid30288.log
[root@localhost bin]# ll
启动日志报:Please set the JAVA_HOME variable inyour environment, We need java(x64)! !!
需要设置vim /etc/profile环境变量
export JAVA_HOME=/root/svr/jdk
source/etc/profile