什么是消息中间件

消息+中间件

消息:消息即为数据,数据就会有规划,有长度,有大小。
中间件:为我们提供发送消息的程序或者服务.
消息:
java提供了一套标准JMS(java message server)
JMS 消息主体(Body)
JMS提供五种消息主体的形式,每种形式通过消息接口定义:
StreamMessage
消息整体主体包含流式Java原生值,它是连续地被填充和读取的。
MapMessage
消息整体主体包含键值对集合,其中键为字符串,值为Java原生类型。条目访问可被计算器连续地或者名称随机地访问,它的顺序并不一定。
TextMessage
消息整体主体包含一个Java String 对象。
ObjectMessage
消息整体主体包含一个Serializable 对象,如果需要使用集合对象,确保JDK 1.2或更高。
BytesMessage

中间件:
为我们提供发送消息的程序或者服务,目前主流的有rocketMq 、kafka、rabbitMq、activemq等。
RocketMQ - 图1

为什么要用消息中间件?

为了对应用解耦,实现异步处理;还可以对瞬时高峰进行削峰;可以跨语言

RocketMQ - 图2

为什么是RocketMq

RocketMQ - 图3
官网:http://rocketmq.apache.org/
常见的消息中间件:Kafka、ActiveMq、Rocketmq、RabbitMQ
稳定无单点HA
集群功能完善
经历过双十一
Java语言实现
架构轻、源码可读性好(面向过程)
生态圈完善,配套好
开源社区活跃
普通消息、顺序消息、分布式事务消息、定时消息

架构图


RocketMQ - 图4
消息中间件和RPC最大区别:
Broker Cluster存储

1、环境

192.168.0.31
192.168.0.32
部署模式:2M-2S-SYNC(两主两从同步写)(机器不够用可以不用部署这么多2M-nosalve)
相关安装包存储路径:/root/svr/rocketmq

注意:部署的时候jdk最好用JDK8。
通过本地打包部署的话,要注意了windows和linux系统的转义符不一样,需要对脚本进行更改。

2、部署

2.1 下载

Apache:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip
Github:
https://github.com/apache/rocketmq/

2.2 解压:

unzip rocketmq-all-4.2.0-bin-release.zip -d /root/svr/rocketmq


2.3 配置host(给nameServer用):

命令:vim /etc/hosts

192.168.0.12 rocketmq1
192.168.0.13 rocketmq2

配置环境变量:vim /etc/profile

export ROCKETMQ_HOME=/root/svr/rocketmq
export PATH=$PATH::$ROCKETMQ_HOME/bin

执行:source /etc/profile

2.4 配置集群参数:

命令(master):vim /root/svr/rocketmq/conf/2m-2s-sync/broker-a.properties

brokerClusterName=tl-rocketmq-cluster
brokerName=broker-a
brokerId=0
namesrvAddr=rocketmq1:9876;rocketmq2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/root/svr/rocketmq/data/store
storePathCommitLog=/root/svr/rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH


命令(slave):vim /root/svr/rocketmq/conf/2m-2s-sync/broker-b-s.properties

brokerClusterName=tl-rocketmq-cluster
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
namesrvAddr=rocketmq1:9876;rocketmq2:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
listenPort=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
destroyMapedFileIntervalForcibly=120000
redeleteHangedFileInterval=120000
diskMaxUsedSpaceRatio=88
storePathRootDir=/root/svr/rocketmq/data/store
storePathCommitLog=/root/svr/rocketmq/data/store/commitlog
maxMessageSize=65536
flushCommitLogLeastPages=4
flushConsumeQueueLeastPages=2
flushCommitLogThoroughInterval=10000
flushConsumeQueueThoroughInterval=60000
checkTransactionMessageEnable=false
sendMessageThreadPoolNums=128
pullMessageThreadPoolNums=128
#角色
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

自动切换角色 zk redis 切换不自动切换

2.5 创建存储&日志文件

mkdir /root/svr/rocketmq/data (也要创建store文件夹)
mkdir/root/svr/rocketmq/data/store/commitlog
mkdir/root/svr/rocketmq/data/store/consumequeue
mkdir /root/svr/rocketmq/data/store/index

注意:如果一台虚拟机部署多个需要用文件进行区分

2.6 修改日志配置文件

mkdir -p /root/svr/rocketmq/logs
cd /root/svr/rocketmq/conf && sed-i ‘s#${user.home}#/root/svr/rocketmq#g’
*.xml

注意logback.*.xml配置文件中${user.home}需要替换自己指定的目录

2.7 改参数

runbroker.sh,runserver.sh启动参数默认对jvm的堆内存设置比较大(不改启动不起来),如果是虚拟机非线上环境需要改下参数,大小可以根据自己机器来决定

默认大小
-Xms8g -Xmx8g -Xmn4g
改为:
JAVA_OPT=”${JAVA_OPT} -server -Xms1g-Xmx1g -Xmn512m”

2.8 复制

scp其他服务器
scp -r rocketmq/root@192.168.0.13:/root/svr/

3、启动服务

3.1启动Nameserver服务

在cd /root/svr/rocketmq/bin目录下执行命令:

nohup sh mqnamesrv &

3.2启动BrokerServer

在cd /root/svr/rocketmq/bin目录下执行命令:

nohup sh mqbroker -c /root/svr/rocketmq/conf/2m-2s-sync/broker-a.properties &

3.3是否启动成功

输入命令jps或者查看rocketmq/logs下日志是否输出正常。
RocketMQ - 图5
安装成功。
查看集群监控状态:
sh mqadmin clusterlist -n 192.168.0.12:9876
RocketMQ - 图6RocketMQ - 图7

监控源码下载地址:

https://github.com/apache/rocketmq-externals.git
配置namesrv
rocketmq.config.namesrvAddr=192.168.0.31:9876

测试:

export NAMESRV_ADDR=rocketmq1:9876;rocketmq2:9876
测试发送端
> sh bin/tools.shorg.apache.rocketmq.example.quickstart.Producer
测试消费端
> sh bin/tools.shorg.apache.rocketmq.example.quickstart.Consumer

服务停止:

在cd /root/svr/rocketmq/bin目录下执行命令或者jps查看进程号 kill -9pid

sh mqshutdown broker
sh mqshutdown namesrv

常见错误


Jvm参数没有配置启动错误:

# There is insufficient memory for the Java RuntimeEnvironment to continue.
# Native memory allocation (malloc) failed to allocate8589934592 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /root/svr/rocketmq/bin/hs_err_pid30288.log
[root@localhost bin]# ll

启动日志报:Please set the JAVA_HOME variable inyour environment, We need java(x64)! !!
需要设置vim /etc/profile环境变量
export JAVA_HOME=/root/svr/jdk
source/etc/profile

各种消息队列对比.pdf

https://developer.aliyun.com/article/636883