参考地址一
参考地址二
rocketmq 的linux部署
rocketmq的linux部署二 看这个
rocketmq-console与rocketmq-dashboard 可视化

jobs -l 查看 所有nohup进程 或者jps
kill -9 进程号
image.png

启动jar包
nohup java -jar rocketmq-console-ng-1.0.0.jar >nohup.out 2>&1 &
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar >nohup.out 2>&1 &

需要JDK环境

1、安装maven

官网下载地址

  • 安装包放在usr/local下解压

命令: tar zvxf apache-maven-3.6.3-bin.tar.gz

  • 配置环境变量

    命令: vim /etc/profile

    1. export MAVEN_HOME=/usr/local/apache-maven-3.6.3
    2. export PATH=$PATH:$MAVEN_HOME/bin
  • 保存后刷新配置

source /etc/profile

  • 添加权限:

chmod a+x /usr/local/apache-maven-3.6.3/bin/mvn

  • 注意路径,完毕后输入:

mvn -v
image.png

2、安装RocketMq

下载地址

  • 把压缩包放入到user/local目录,解压:

unzip rocketmq-all-4.9.2-source-release.zip

  • 进入到解压后的目录:

cd rocketmq-all-4.9.2/

  • 使用Maven安装:

mvn -Prelease-all -DskipTests clean install -U(需要几分钟时间,耐心等待)

  • 安装完毕后,使用xftp查看rocketmq安装后的目录:

distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 表明安装成功

  • 打开根目录/etc/profile添加配置:

    1. # rocketmq环境变量配置
    2. export ROCKETMQ=/usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
    3. export PATH=$PATH:$ROCKETMQ/bin
  • 保存后刷新配置:

source /etc/profile

  • 添加权限:

chmod +x mqnamesrv mqbroker mqshutdown

  • 启动前,我们需要创建一下log目录:
    • 进入到/root目录,创建log文件夹;
    • 在log文件夹内创建:rocketmqlogs文件夹;
    • touch创建文件

在rocketmqlogs文件夹内创建namesrv.log文件和broker.log文件;
image.png

  • 打开rocketmq-4.9.2/bin,找到runserver.sh和runbroker.sh文件,根据自己的内存环境配置 ```java

    编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小

    $ vim bin/runbroker.sh

    参考设置

    JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”

$ vim bin/runserver.sh

  1. # 参考设置
  2. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
  1. - 打开conf目录下的broker.conf文件,在最后面添加外网IP配置:
  2. ```java
  3. brokerIP1 = 8.xxx.x.xx

不配置默认是内网IP!

2.1、启动namesrv

mqnamesrv &
image.png
启动namesrv成功,按crtl+c退出命令!

2.2、启动broker

mqbroker -n 8.xxx.x.xx:9876 -c /usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/conf/broker.conf &
image.png
查看启动状态:jps
image.png
关闭broker:mqshutdown broker
关闭namesrv:mqshutdown namesrv

3、web可视化

下载源码

  • 解压源码

    1. tar -zxvf rocketmq-console-1.0.0.tar.gz
  • 重命名

    1. mv rocketmq-externals-rocketmq-console-1.0.0 rocketmq-console
  • 修改端口和rocketmq连接

    1. cd rocketmq-externals-rocketmq-console-1.0.0/rocketmq-console/src/main/resources/
    2. vim application.properties
  • 英文输入状态下 按 i 进入insert模式 新增修改如下配置

    1. server.port=8082
    2. rocketmq.config.namesrvAddr=localhost:9876

    按esc输入 :wq 保存并退出

  • 编译

    1. cd /usr/local/rocketmq-console/rocketmq-console/
    2. mvn clean package -Dmaven.test.skip=true
  • 启动

    1. cd target/
    2. java -jar rocketmq-console-ng-1.0.0.jar &

    image.png

  • 访问

http://8.142.76.223:8082/#/

页面内容简介
image.png

  • 开放的端口
    1. rocke9876
    2. vip通道端口:10911
    3. vip通道端口:10909
    4. 10909VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
    image.png

4、rocketMQ角色

  • Producer:消息的发送者;举例:发件者
  • Consumer:消息接收者;举例:收件人
  • Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
  • Broker:暂存和传输消息;举例:快递公司
  • NameServer:管理 Broker;举例:快递公司的管理机构
  • Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息



image.png

4.1、broker配置文件详解

broker 默认的配置文件位置在:conf/broker.conf

  1. #所属集群名字
  2. brokerClusterName=rocketmq-cluster
  3. #broker名字,注意此处不同的配置文件填写的不一样
  4. brokerName=broker-a
  5. #0 表示 Master,>0 表示 Slave
  6. brokerId=0
  7. #nameServer地址,分号分割
  8. namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
  9. #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
  10. defaultTopicQueueNums=4
  11. #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
  12. autoCreateTopicEnable=true
  13. #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
  14. autoCreateSubscriptionGroup=true
  15. #Broker 对外服务的监听端口
  16. listenPort=10911
  17. #删除文件时间点,默认凌晨 4
  18. deleteWhen=04
  19. #文件保留时间,默认 48 小时
  20. fileReservedTime=120
  21. #commitLog每个文件的大小默认1G
  22. mapedFileSizeCommitLog=1073741824
  23. #ConsumeQueue每个文件默认存30W条,根据业务情况调整
  24. mapedFileSizeConsumeQueue=300000
  25. #destroyMapedFileIntervalForcibly=120000
  26. #redeleteHangedFileInterval=120000
  27. #检测物理文件磁盘空间
  28. diskMaxUsedSpaceRatio=88
  29. #存储路径
  30. storePathRootDir=/usr/local/rocketmq/store
  31. #commitLog 存储路径
  32. storePathCommitLog=/usr/local/rocketmq/store/commitlog
  33. #消费队列存储路径存储路径
  34. storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
  35. #消息索引存储路径
  36. storePathIndex=/usr/local/rocketmq/store/index
  37. #checkpoint 文件存储路径
  38. storeCheckpoint=/usr/local/rocketmq/store/checkpoint
  39. #abort 文件存储路径
  40. abortFile=/usr/local/rocketmq/store/abort
  41. #限制的消息大小
  42. maxMessageSize=65536
  43. #flushCommitLogLeastPages=4
  44. #flushConsumeQueueLeastPages=2
  45. #flushCommitLogThoroughInterval=10000
  46. #flushConsumeQueueThoroughInterval=60000
  47. #Broker 的角色
  48. #- ASYNC_MASTER 异步复制Master
  49. #- SYNC_MASTER 同步双写Master
  50. #- SLAVE
  51. brokerRole=SYNC_MASTER
  52. #刷盘方式
  53. #- ASYNC_FLUSH 异步刷盘
  54. #- SYNC_FLUSH 同步刷盘
  55. flushDiskType=SYNC_FLUSH
  56. #checkTransactionMessageEnable=false
  57. #发消息线程池数量
  58. #sendMessageThreadPoolNums=128
  59. #拉消息线程池数量
  60. #pullMessageThreadPoolNums=128

4.2、运行流程

  • 导入MQ客户端依赖

注意:rocketmq-client 的版本,要与 RocketMQ 的版本一致

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.6.0</version>
  5. </dependency>
  • 消息发送者步骤分析:

    • 创建消息生产者 producer,并指定生产者组名
    • 指定 Nameserver 地址
    • 启动 producer
    • 创建消息对象,指定主题 Topic、Tag 和消息体
    • 发送消息
    • 关闭生产者 producer
  • 消息消费者步骤分析:

    • 创建消费者 Consumer,制定消费者组名
    • 指定 Nameserver 地址
    • 订阅主题 Topic 和 Tag
    • 设置回调函数,处理消息
    • 启动消费者 consumer


5、springboot测试案例

  • pom.xml文件

    1. <dependency>
    2. <groupId>org.apache.rocketmq</groupId>
    3. <artifactId>rocketmq-spring-boot-starter</artifactId>
    4. <version>2.0.2</version>
    5. </dependency>
  • Producer生产者 ```java package com.mq.rocketmq;

import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;

/**

  • @Description
  • @Author xinxiaokang
  • @Date 2021/12/23 11:02 / @Component public class Producer { /*

    • 生产者的组名 */ @Value(“${apache.rocketmq.producer.producerGroup}”) private String producerGroup;

      /**

    • NameServer 地址 */ @Value(“${apache.rocketmq.namesrvAddr}”) private String namesrvAddr;
  1. public void orderedProducer() throws MQClientException, InterruptedException {
  2. /**
  3. * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
  4. * 注意:ProducerGroupName需要由应用来保证唯一
  5. * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
  6. * 因为服务器会回查这个Group下的任意一个Producer
  7. */
  8. DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
  9. producer.setNamesrvAddr(namesrvAddr);
  10. /**
  11. * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
  12. */
  13. producer.start();
  14. /**
  15. * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
  16. * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
  17. * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
  18. * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
  19. */
  20. try {
  21. for (int i = 0; i < 10; i++) {
  22. Message msg = new Message("Topic1",// topic
  23. "TagA",// tag
  24. "001",// key
  25. ("Send Msg:Hello MetaQ1").getBytes());// body
  26. SendResult sendResult = producer.send(msg);
  27. System.out.println(sendResult);
  28. Message msg2 = new Message("Topic2",// topic
  29. "TagB",// tag
  30. "002",// key
  31. ("Send Msg:Hello MetaQ2").getBytes());// body
  32. SendResult sendResult2 = producer.send(msg2);
  33. System.out.println(sendResult2);
  34. Message msg3 = new Message("Topic3",// topic
  35. "TagC",// tag
  36. "003",// key
  37. ("Send Msg:Hello MetaQ3").getBytes());// body
  38. SendResult sendResult3 = producer.send(msg3);
  39. System.out.println(sendResult3);
  40. }
  41. } catch (Exception e) {
  42. e.printStackTrace();
  43. }
  44. /**
  45. * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
  46. * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
  47. */
  48. producer.shutdown();
  49. }

}

  1. - **Consumer消费者**
  2. ```java
  3. package com.mq.rocketmq;
  4. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  5. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
  6. import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
  7. import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
  8. import org.apache.rocketmq.client.exception.MQClientException;
  9. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  10. import org.apache.rocketmq.common.message.MessageExt;
  11. import org.springframework.beans.factory.annotation.Value;
  12. import org.springframework.stereotype.Component;
  13. import java.util.List;
  14. /**
  15. * @Description
  16. * @Author xinxiaokang
  17. * @Date 2021/12/23 11:06
  18. */
  19. @Component
  20. public class Consumer {
  21. /**
  22. * 生产者的组名
  23. */
  24. @Value("${apache.rocketmq.producer.producerGroup}")
  25. private String producerGroup;
  26. /**
  27. * NameServer 地址
  28. */
  29. @Value("${apache.rocketmq.namesrvAddr}")
  30. private String namesrvAddr;
  31. /**
  32. * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
  33. * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
  34. */
  35. public void orderedConsumer() throws InterruptedException, MQClientException {
  36. /**
  37. * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
  38. * 注意:ConsumerGroupName需要由应用来保证唯一
  39. */
  40. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
  41. // consumer.setNamesrvAddr("10.10.0.102:9876");
  42. consumer.setNamesrvAddr(namesrvAddr);
  43. /**
  44. * 订阅指定topic下tags分别等于TagA或TagC或TagD
  45. */
  46. consumer.subscribe("Topic1", "TagA || TagC || TagD");
  47. /**
  48. * 订阅指定topic下所有消息<br>
  49. * 注意:一个consumer对象可以订阅多个topic
  50. */
  51. consumer.subscribe("Topic2", "*");
  52. consumer.subscribe("Topic3", "*");
  53. /**
  54. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
  55. */
  56. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  57. consumer.registerMessageListener(new MessageListenerConcurrently() {
  58. /**
  59. * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
  60. */
  61. @Override
  62. public ConsumeConcurrentlyStatus consumeMessage(
  63. List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  64. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
  65. MessageExt msg = msgs.get(0);
  66. if (msg.getTopic().equals("Topic1")) {
  67. if (null != msg.getTags()) {
  68. // 执行Topic1的消费逻辑
  69. if (msg.getTags().equals("TagA")) {
  70. // 执行TagA的消费
  71. System.out.println("TagA开始。");
  72. } else if (msg.getTags().equals("TagC")) {
  73. System.out.println("TagC开始。");
  74. // 执行TagC的消费
  75. } else if (msg.getTags().equals("TagD")) {
  76. // 执行TagD的消费
  77. System.out.println("TagD开始。");
  78. }
  79. }
  80. } else if (msg.getTopic().equals("Topic2")) {
  81. // 执行Topic2的消费逻辑
  82. System.out.println("Topic2");
  83. }
  84. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  85. }
  86. });
  87. /**
  88. * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
  89. */
  90. consumer.start();
  91. System.out.println("Consumer Started.");
  92. }
  93. }
  • properties配置文件 ```java

消费者的组名

apache.rocketmq.consumer.PushConsumer=PushConsumer

生产者的组名

apache.rocketmq.producer.producerGroup=Producer

NameServer地址

apache.rocketmq.namesrvAddr=8.142.76.223:9876

设置应用端口

server.port=8089

  1. - **测试代码**
  2. ```java
  3. package com.mq.rocketmq;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. /**
  8. * @Description
  9. * @Author xinxiaokang
  10. * @Date 2021/12/23 11:13
  11. */
  12. @RestController
  13. public class Test {
  14. @Autowired
  15. private Producer producer;
  16. @Autowired
  17. private Consumer consumer;
  18. @RequestMapping("/test")
  19. public String testMQ2() {
  20. try {
  21. System.out.println("-----------------开始生产-----------------");
  22. producer.orderedProducer();
  23. System.out.println("-----------------开始消费-----------------");
  24. consumer.orderedConsumer();
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. return "success";
  29. }
  30. }

image.png

image.png
image.png
image.png
image.png
image.png

6、springboot测试案例二

  • 引入依赖

    1. <!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
    2. <dependency>
    3. <groupId>org.apache.rocketmq</groupId>
    4. <artifactId>rocketmq-client</artifactId>
    5. <version>4.9.2</version>
    6. </dependency>
  • JmsConfig rockerMQ配置类

    1. public class JmsConfig {
    2. //RocketMQServer地址
    3. public static final String NAME_SERVER = "127.0.0.1:9876";
    4. /**
    5. * 主题名称
    6. */
    7. public static final String TOPIC = "topic_family";
    8. }
  • Producer生产者实体类 ```java @Slf4j @Component public class Producer { private String producerGroup = “test_producer”; private DefaultMQProducer producer;

  1. public Producer(){
  2. //示例生产者
  3. producer = new DefaultMQProducer(producerGroup);
  4. //不开启vip通道 开通口端口会减2
  5. producer.setVipChannelEnabled(false);
  6. //绑定name server
  7. producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
  8. start();
  9. }
  10. /**
  11. * 对象在使用之前必须要调用一次,只能初始化一次
  12. */
  13. public void start(){
  14. try {
  15. this.producer.start();
  16. } catch (MQClientException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. public DefaultMQProducer getProducer(){
  21. return this.producer;
  22. }
  23. /**
  24. * 一般在应用上下文,使用上下文监听器,进行关闭
  25. */
  26. public void shutdown(){
  27. this.producer.shutdown();
  28. }

}

  1. - **Consumer消费者实体类**
  2. ```java
  3. @Slf4j
  4. @Component
  5. public class Consumer {
  6. /**
  7. * 消费者实体对象
  8. */
  9. private DefaultMQPushConsumer consumer;
  10. /**
  11. * 消费者组
  12. */
  13. public static final String CONSUMER_GROUP = "test_consumer";
  14. /**
  15. * 通过构造函数 实例化对象
  16. */
  17. public Consumer() throws MQClientException {
  18. consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
  19. consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
  20. //消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
  21. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  22. //订阅主题和 标签( * 代表所有标签)下信息
  23. consumer.subscribe(JmsConfig.TOPIC, "*");
  24. // //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
  25. consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
  26. // msgs中只收集同一个topic,同一个tag,并且key相同的message
  27. // 会把不同的消息分别放置到不同的队列中
  28. try {
  29. for (Message msg : msgs) {
  30. //消费者获取消息 这里只输出 不做后面逻辑处理
  31. String body = new String(msg.getBody(), "utf-8");
  32. log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
  33. }
  34. } catch (UnsupportedEncodingException e) {
  35. e.printStackTrace();
  36. return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  37. }
  38. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  39. });
  40. consumer.start();
  41. System.out.println("消费者 启动成功=======");
  42. }
  43. }
  • UserController消息验证 ```java @RestController public class UserController {

    private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); @Autowired Producer producer;

    @RequestMapping(“/hello”) public void hello()throws Exception{

  1. for(int i=1;i<10;i++){
  2. //创建生产信息
  3. Message message = new Message(JmsConfig.TOPIC, "testtag", ("Hello World"+i).getBytes());
  4. //发送
  5. SendResult sendResult = producer.getProducer().send(message);
  6. logger.info("输出生产者信息={}",sendResult);
  7. }
  8. }

} ``` 访问:localhost:8080/hello 获取一下结果
image.png