官网地址:
https://rocketmq.apache.org/release_notes/release-notes-4.7.1/

架构
RocketMq安装 - 图1

概念

NameServer Cluster

相当于注册中心,生产者消费者链接这个地址

Broker Cluster

具体的服务端几圈

Producer Cluster

生成者集群

Consumer Cluster

消费者集群

image.png
下载 rocketmq-all-4.71-bin-release.zip

先解压

unzip rocketmq-all-4.7.1-bin-release.zip

单机版

1:启动NameServer

修改环境变量

export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release

修改启动jvm参数
runserver.sh
默认给的太大了

JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m”

启动命令:

nohup bin/mqnamesrv -n “124.221.185.240:9876” &

2:启动broker

修改broker.conf

针对配置在云上采用供方访问

namesrvAddr=公网ip:9876
brokerIP1=公网ip

是否允许 Broker 自动创建Topic,建议线下开启,线上关闭

autoCreateTopicEnable=true

是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭

autoCreateSubscriptionGroup=true

开启SQL过滤

enablePropertyFilter=true

修改
runbroker.sh 默认太大了

JAVA_OPT=”${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m”

启动

nohup bin/mqbroker -n 124.221.185.240:9876 -c conf/broker.conf &

集群版

集群机构

机器名 nemaeServer节点部署 broker节点部署
worker1 nameserver
worker2 nameserver broker-a, broker-b-s
worker3 nameserver broker-b,broker-a-s

conf/2m-2s-async下的配置文件
在rocketmq的config目录下可以看到rocketmq建议的各种配置方式:

  • 2m-2s-async: 2主2从异步刷盘(吞吐量较大,但是消息可能丢失),
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全),
  • 2m-noslave:2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置。
  • 而dleger就是用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

    1:修改broker-a.properties

    1. #所属集群名字,名字一样的节点就在同一个集群内
    2. brokerClusterName=rocketmq-cluster
    3. #broker名字,名字一样的节点就是一组主从节点。
    4. brokerName=broker-a
    5. #brokerid,0就表示是Master,>0的都是表示 Slave
    6. brokerId=0
    7. #nameServer地址,分号分割
    8. namesrvAddr=worker1:9876;worker2:9876;worker3:9876
    9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    10. defaultTopicQueueNums=4
    11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    12. autoCreateTopicEnable=true
    13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    14. autoCreateSubscriptionGroup=true
    15. #Broker 对外服务的监听端口
    16. listenPort=10911
    17. #删除文件时间点,默认凌晨 4点
    18. deleteWhen=04
    19. #文件保留时间,默认 48 小时
    20. fileReservedTime=120
    21. #commitLog每个文件的大小默认1G
    22. mapedFileSizeCommitLog=1073741824
    23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    24. mapedFileSizeConsumeQueue=300000
    25. #destroyMapedFileIntervalForcibly=120000
    26. #redeleteHangedFileInterval=120000
    27. #检测物理文件磁盘空间
    28. diskMaxUsedSpaceRatio=88
    29. #存储路径
    30. storePathRootDir=/app/rocketmq/store
    31. #commitLog 存储路径
    32. storePathCommitLog=/app/rocketmq/store/commitlog
    33. #消费队列存储路径存储路径
    34. storePathConsumeQueue=/app/rocketmq/store/consumequeue
    35. #消息索引存储路径
    36. storePathIndex=/app/rocketmq/store/index
    37. #checkpoint 文件存储路径
    38. storeCheckpoint=/app/rocketmq/store/checkpoint
    39. #abort 文件存储路径
    40. abortFile=/app/rocketmq/store/abort
    41. #限制的消息大小
    42. maxMessageSize=65536
    43. #flushCommitLogLeastPages=4
    44. #flushConsumeQueueLeastPages=2
    45. #flushCommitLogThoroughInterval=10000
    46. #flushConsumeQueueThoroughInterval=60000
    47. #Broker 的角色
    48. #- ASYNC_MASTER 异步复制Master
    49. #- SYNC_MASTER 同步双写Master
    50. #- SLAVE
    51. brokerRole=ASYNC_MASTER
    52. #刷盘方式
    53. #- ASYNC_FLUSH 异步刷盘
    54. #- SYNC_FLUSH 同步刷盘
    55. flushDiskType=ASYNC_FLUSH
    56. #checkTransactionMessageEnable=false
    57. #发消息线程池数量
    58. #sendMessageThreadPoolNums=128
    59. #拉消息线程池数量
    60. #pullMessageThreadPoolNums=128

    2: broker-a-s.properties

    1. #所属集群名字,名字一样的节点就在同一个集群内
    2. brokerClusterName=rocketmq-cluster
    3. #broker名字,名字一样的节点就是一组主从节点。
    4. brokerName=broker-a
    5. #brokerid,0就表示是Master,>0的都是表示 Slave
    6. brokerId=1
    7. #nameServer地址,分号分割
    8. namesrvAddr=worker1:9876;worker2:9876;worker3:9876
    9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    10. defaultTopicQueueNums=4
    11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    12. autoCreateTopicEnable=true
    13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    14. autoCreateSubscriptionGroup=true
    15. #Broker 对外服务的监听端口
    16. listenPort=11011
    17. #删除文件时间点,默认凌晨 4点
    18. deleteWhen=04
    19. #文件保留时间,默认 48 小时
    20. fileReservedTime=120
    21. #commitLog每个文件的大小默认1G
    22. mapedFileSizeCommitLog=1073741824
    23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    24. mapedFileSizeConsumeQueue=300000
    25. #destroyMapedFileIntervalForcibly=120000
    26. #redeleteHangedFileInterval=120000
    27. #检测物理文件磁盘空间
    28. diskMaxUsedSpaceRatio=88
    29. #存储路径
    30. storePathRootDir=/app/rocketmq/storeSlave
    31. #commitLog 存储路径
    32. storePathCommitLog=/app/rocketmq/storeSlave/commitlog
    33. #消费队列存储路径存储路径
    34. storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
    35. #消息索引存储路径
    36. storePathIndex=/app/rocketmq/storeSlave/index
    37. #checkpoint 文件存储路径
    38. storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
    39. #abort 文件存储路径
    40. abortFile=/app/rocketmq/storeSlave/abort
    41. #限制的消息大小
    42. maxMessageSize=65536
    43. #flushCommitLogLeastPages=4
    44. #flushConsumeQueueLeastPages=2
    45. #flushCommitLogThoroughInterval=10000
    46. #flushConsumeQueueThoroughInterval=60000
    47. #Broker 的角色
    48. #- ASYNC_MASTER 异步复制Master
    49. #- SYNC_MASTER 同步双写Master
    50. #- SLAVE
    51. brokerRole=SLAVE
    52. #刷盘方式
    53. #- ASYNC_FLUSH 异步刷盘
    54. #- SYNC_FLUSH 同步刷盘
    55. flushDiskType=ASYNC_FLUSH
    56. #checkTransactionMessageEnable=false
    57. #发消息线程池数量
    58. #sendMessageThreadPoolNums=128
    59. #拉消息线程池数量
    60. #pullMessageThreadPoolNums=128

    3:broker-b.properties

    1. #所属集群名字,名字一样的节点就在同一个集群内
    2. brokerClusterName=rocketmq-cluster
    3. #broker名字,名字一样的节点就是一组主从节点。
    4. brokerName=broker-b
    5. #brokerid,0就表示是Master,>0的都是表示 Slave
    6. brokerId=0
    7. #nameServer地址,分号分割
    8. namesrvAddr=worker1:9876;worker2:9876;worker3:9876
    9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    10. defaultTopicQueueNums=4
    11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    12. autoCreateTopicEnable=true
    13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    14. autoCreateSubscriptionGroup=true
    15. #Broker 对外服务的监听端口
    16. listenPort=10911
    17. #删除文件时间点,默认凌晨 4点
    18. deleteWhen=04
    19. #文件保留时间,默认 48 小时
    20. fileReservedTime=120
    21. #commitLog每个文件的大小默认1G
    22. mapedFileSizeCommitLog=1073741824
    23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    24. mapedFileSizeConsumeQueue=300000
    25. #destroyMapedFileIntervalForcibly=120000
    26. #redeleteHangedFileInterval=120000
    27. #检测物理文件磁盘空间
    28. diskMaxUsedSpaceRatio=88
    29. #存储路径
    30. storePathRootDir=/app/rocketmq/store
    31. #commitLog 存储路径
    32. storePathCommitLog=/app/rocketmq/store/commitlog
    33. #消费队列存储路径存储路径
    34. storePathConsumeQueue=/app/rocketmq/store/consumequeue
    35. #消息索引存储路径
    36. storePathIndex=/app/rocketmq/store/index
    37. #checkpoint 文件存储路径
    38. storeCheckpoint=/app/rocketmq/store/checkpoint
    39. #abort 文件存储路径
    40. abortFile=/app/rocketmq/store/abort
    41. #限制的消息大小
    42. maxMessageSize=65536
    43. #flushCommitLogLeastPages=4
    44. #flushConsumeQueueLeastPages=2
    45. #flushCommitLogThoroughInterval=10000
    46. #flushConsumeQueueThoroughInterval=60000
    47. #Broker 的角色
    48. #- ASYNC_MASTER 异步复制Master
    49. #- SYNC_MASTER 同步双写Master
    50. #- SLAVE
    51. brokerRole=ASYNC_MASTER
    52. #刷盘方式
    53. #- ASYNC_FLUSH 异步刷盘
    54. #- SYNC_FLUSH 同步刷盘
    55. flushDiskType=ASYNC_FLUSH
    56. #checkTransactionMessageEnable=false
    57. #发消息线程池数量
    58. #sendMessageThreadPoolNums=128
    59. #拉消息线程池数量
    60. #pullMessageThreadPoolNums=128

    4: borker-b-s.propertiess

    1. #所属集群名字,名字一样的节点就在同一个集群内
    2. brokerClusterName=rocketmq-cluster
    3. #broker名字,名字一样的节点就是一组主从节点。
    4. brokerName=broker-b
    5. #brokerid,0就表示是Master,>0的都是表示 Slave
    6. brokerId=1
    7. #nameServer地址,分号分割
    8. namesrvAddr=worker1:9876;worker2:9876;worker3:9876
    9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    10. defaultTopicQueueNums=4
    11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    12. autoCreateTopicEnable=true
    13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    14. autoCreateSubscriptionGroup=true
    15. #Broker 对外服务的监听端口
    16. listenPort=11011
    17. #删除文件时间点,默认凌晨 4点
    18. deleteWhen=04
    19. #文件保留时间,默认 48 小时
    20. fileReservedTime=120
    21. #commitLog每个文件的大小默认1G
    22. mapedFileSizeCommitLog=1073741824
    23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    24. mapedFileSizeConsumeQueue=300000
    25. #destroyMapedFileIntervalForcibly=120000
    26. #redeleteHangedFileInterval=120000
    27. #检测物理文件磁盘空间
    28. diskMaxUsedSpaceRatio=88
    29. #存储路径
    30. storePathRootDir=/app/rocketmq/storeSlave
    31. #commitLog 存储路径
    32. storePathCommitLog=/app/rocketmq/storeSlave/commitlog
    33. #消费队列存储路径存储路径
    34. storePathConsumeQueue=/app/rocketmq/storeSlave/consumequeue
    35. #消息索引存储路径
    36. storePathIndex=/app/rocketmq/storeSlave/index
    37. #checkpoint 文件存储路径
    38. storeCheckpoint=/app/rocketmq/storeSlave/checkpoint
    39. #abort 文件存储路径
    40. abortFile=/app/rocketmq/storeSlave/abort
    41. #限制的消息大小
    42. maxMessageSize=65536
    43. #flushCommitLogLeastPages=4
    44. #flushConsumeQueueLeastPages=2
    45. #flushCommitLogThoroughInterval=10000
    46. #flushConsumeQueueThoroughInterval=60000
    47. #Broker 的角色
    48. #- ASYNC_MASTER 异步复制Master
    49. #- SYNC_MASTER 同步双写Master
    50. #- SLAVE
    51. brokerRole=SLAVE
    52. #刷盘方式
    53. #- ASYNC_FLUSH 异步刷盘
    54. #- SYNC_FLUSH 同步刷盘
    55. flushDiskType=ASYNC_FLUSH
    56. #checkTransactionMessageEnable=false
    57. #发消息线程池数量
    58. #sendMessageThreadPoolNums=128
    59. #拉消息线程池数量
    60. #pullMessageThreadPoolNums=128

注意点目录不要一样 listenPort不要一样

  1. 分别启动nameserver:
  2. 分别启动broker

    nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties &

控制台rocket-console

https://github.com/apache/rocketmq-externals 中的 rocket-console

修改项目总application.properties文件

rocketmq.config.namesrvAddr=worker1:9876;worker2:9876;worker3:9876

打包启动即可

Mq的需要应对的作用
各个优缺点

复杂性升高
可用性降低
消息一致性问题

各个MQ的比较

集群搭建过过程
nameServer部署,相当于注册中心
broker部署,三个节点

图像化管理界面:rocketmq-console-ng……jar

tools.sh:封装了一些指令

推,拉模式消费消息
messageQueue: 最小队列,在不同的broker上

LitePushConcusmerAssign
topic 跟对对队列的关系

队列最小逻辑单元

局部有序,全局有序

延迟队列实现,开源版本18个队列