前言

最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。

环境准备

在搭建RocketMQ之前,请先确保如下环境已经搭建完毕

  • Java环境(我的JDK1.8)
  • Maven环境(我的3.6.1目前最新版)
  • Git环境

没有搭建的同学走传送门:

JDK环境搭建: JAVA8环境搭建 Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍 Git环境搭建:Git环境搭建及配置


1. Windows环境下搭建

1.1 下载

官方网站:rocketmq.apache.org/

RocketMQ-环境搭建 - 图1

目前最新版的是V4.5.0,点击进去。

RocketMQ-环境搭建 - 图2

选择下载 rocketmq-all-4.5.0-bin-release.zip。弹出另外一个页面,这里选择rocketmq-all-4.5.0-bin-release.zip进行下载。

RocketMQ-环境搭建 - 图3

下载成功后,选择一个目录放好并解压。

RocketMQ-环境搭建 - 图4

1.2 修改JVM配置

以上操作完毕之后,进入目录bin目录,我这里是 H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin。 找到runserver.cmdrunbroker.cmd中的JAVA_OPT。

RocketMQ-环境搭建 - 图5

原JAVA_OPT:

  1. set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

将 Xms Xmx 这两个值改小一些,改为1g,如:

  1. set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

自己根据虚拟机内存大小设置,超出内存大小可能会报错。

1.3 配置环境变量

上述步骤执行完毕后,我们需要将RocketMQ安装目录的bin目录配置到环境变量中。

RocketMQ-环境搭建 - 图6

1.4 启动

以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。

在RocketMQ安装目录的bin目录下,执行命令cmd:

我的目录:

  1. H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin

可以通过shift+鼠标右击 触发cmd窗口选项。也可以通过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。

1.4.1 启动NAMESERVER

  • 执行命令:start mqnamesrv.cmd

成功后会弹出提示框,此框勿关闭。

RocketMQ-环境搭建 - 图7

1.4.3 启动BROKER

  • 执行命令:‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’

注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。

RocketMQ-环境搭建 - 图8

打开 runbroker.cmd 进行修改 原:

  1. set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"

修改后:

  1. set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""

RocketMQ-环境搭建 - 图9

再次执行命令: 启动成功!

RocketMQ-环境搭建 - 图10

这时候一共有三个窗口。


2. 安装Console监控

2.1 下载

下载地址:github.com/apache/rock…

下载完后如图所示:选择——>rocketmq-console

RocketMQ-环境搭建 - 图11

2.2 配置

下载完成之后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。

RocketMQ-环境搭建 - 图12

RocketMQ-环境搭建 - 图13

2.2 编译启动

进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。中间有个比较慢的下载过程需要等待。

RocketMQ-环境搭建 - 图14

编译成功之后,cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.1.jar’。

RocketMQ-环境搭建 - 图15

RocketMQ-环境搭建 - 图16

2.3 查看

访问地址:localhost:8082

RocketMQ-环境搭建 - 图17

2.Linux环境下搭建

2.1 环境准备

  • Java环境
  • Maven环境

2.1.1 Linux环境搭建Jdk

下载JDK:www.oracle.com/technetwork…

下载需要的版本:

RocketMQ-环境搭建 - 图18

上传到创建的目录/usr/java

解压命令

  1. tar -zxvf jdk-8u181-linux-x64.tar.gz

配置环境变量命令

  1. vim /etc/profile
  2. JAVA_HOME=/usr/java/jdk1.8.0_161
  3. JRE_HOME=/usr/java/jdk1.8.0_161/jre
  4. CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
  5. PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
  6. export JAVA_HOME JRE_HOME CLASS_PATH PATH
  7. source /etc/profile

验证是否成功命令

  1. java -version

RocketMQ-环境搭建 - 图19

按照以上操作,完成JDK的安装。接下来安装Maven环境。

2.1.2 Linux环境搭建Maven

  1. 下载命令:
  1. wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz
  1. 解压命令:
  1. tar -zxvf apache-maven-3.2.2-bin.tar.gz
  1. 配置Maven环境命令:
  1. vim /etc/profile
  2. #配置maven环境变量
  3. export MAVEN_HOME=/usr/maven/apache-maven-3.5.4
  4. export MAVEN_HOME
  5. export PATH=$PATH:$MAVEN_HOME/bin
  6. source /etc/profile
  1. 验证是否成功命令:
  1. mvn -v

RocketMQ-环境搭建 - 图20

2.2 下载RocketMQ

  1. 下载命令:
  1. wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip
  1. 解压命令:
  1. unzip rocketmq-all-4.4.0-source-release.zip

RocketMQ-环境搭建 - 图21

  1. 构建二进制文件命令

进入解压后的文件目录。

  1. mvn -Prelease-all -DskipTests clean install -U

RocketMQ-环境搭建 - 图22

2.3 修改JVM配置

同Windows环境一样,修改JVM配置。 移动到目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin 中。编辑bin目录下runserver.shrunbroker.sh文件。

根据个人虚拟机大小进行修改

  1. vim runserver.sh
  2. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
  3. vim runbroker.sh
  4. JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"

RocketMQ-环境搭建 - 图23

2.4 配置RocketMQ环境变量

分别执行如下命令:

  1. #修改环境变量
  2. vim /etc/profile
  3. export ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
  4. export PATH=$PATH:$ROCKETMQ/bin
  5. #更新配置
  6. source /etc/profile

RocketMQ-环境搭建 - 图24

2.5 启动NAMESERVER

依然在之前的目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq

  • 执行命令:
  1. ##启动命令
  2. nohup sh bin/mqnamesrv >/dev/null 2>&1 &
  3. ##查看日志
  4. tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ-环境搭建 - 图25

可以看图已经成功了!

2.6 启动BROKER

  • 执行命令:
  1. ##启动命令
  2. nohup sh bin/mqbroker -n localhost:9876 &
  3. ##查看日志
  4. tail -f ~/logs/rocketmqlogs/broker.log

RocketMQ-环境搭建 - 图26

注意防火墙,如果端口连接失败,注意开通。

2.7 关闭命令

  1. sh bin/mqshutdown broker //停止 broker
  2. sh bin/mqshutdown namesrv //停止 nameserver

2.8 配置Console监控平台

同Windows平台搭建

2.8.1 启动Console

我这里直接将Windows平台打包好的jar包直接丢到了Linux系统中

  • 启动命令:
  1. java -jar rocketmq-console-ng-1.0.1.jar

RocketMQ-环境搭建 - 图27

2.8.2 访问Console管理界面

访问地址:http://192.168.220.72:8082

RocketMQ-环境搭建 - 图28


3. Console监控平台说明

这里不做过多介绍,可以参考以下文章

官网地址:github.com/apache/rock…

其他博客地址:guozh.net/rocketmqzhi…

3. 案例测试

案例整合环境:SpringBoot环境 案例来源于网络

3.1 pom.xml文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.1.4.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.coderprogramming.rocketmq</groupId>
  12. <artifactId>rocketmq</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>rocketmq</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.apache.rocketmq</groupId>
  31. <artifactId>rocketmq-spring-boot-starter</artifactId>
  32. <version>2.0.2</version>
  33. </dependency>
  34. </dependencies>
  35. <build>
  36. <plugins>
  37. <plugin>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-maven-plugin</artifactId>
  40. </plugin>
  41. </plugins>
  42. </build>
  43. </project>

3.2 Producer生产者

  1. **
  2. &emsp;* @Description: 生产者
  3. &emsp;* @author Coder编程
  4. &emsp;* @date 2019/5/8 17:08
  5. &emsp;*/
  6. @Component
  7. public class Producer {
  8. /**
  9. * 生产者的组名
  10. */
  11. @Value("${apache.rocketmq.producer.producerGroup}")
  12. private String producerGroup;
  13. /**
  14. * NameServer 地址
  15. */
  16. @Value("${apache.rocketmq.namesrvAddr}")
  17. private String namesrvAddr;
  18. public void orderedProducer() throws MQClientException, InterruptedException {
  19. /**
  20. * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
  21. * 注意:ProducerGroupName需要由应用来保证唯一
  22. * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
  23. * 因为服务器会回查这个Group下的任意一个Producer
  24. */
  25. DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
  26. producer.setNamesrvAddr(namesrvAddr);
  27. /**
  28. * Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
  29. */
  30. producer.start();
  31. /**
  32. * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
  33. * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
  34. * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
  35. * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
  36. */
  37. try {
  38. for (int i = 0; i < 10; i++) {
  39. Message msg = new Message("Topic1",// topic
  40. "TagA",// tag
  41. "001",// key
  42. ("Send Msg:Hello MetaQ1").getBytes());// body
  43. SendResult sendResult = producer.send(msg);
  44. System.out.println(sendResult);
  45. Message msg2 = new Message("Topic2",// topic
  46. "TagB",// tag
  47. "002",// key
  48. ("Send Msg:Hello MetaQ2").getBytes());// body
  49. SendResult sendResult2 = producer.send(msg2);
  50. System.out.println(sendResult2);
  51. Message msg3 = new Message("Topic3",// topic
  52. "TagC",// tag
  53. "003",// key
  54. ("Send Msg:Hello MetaQ3").getBytes());// body
  55. SendResult sendResult3 = producer.send(msg3);
  56. System.out.println(sendResult3);
  57. }
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. /**
  62. * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
  63. * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
  64. */
  65. producer.shutdown();
  66. }
  67. }

3.3 Consumer消费者

  1. /**
  2. &emsp;* @Description: 消费者
  3. &emsp;* @author Coder编程
  4. &emsp;* @date 2019/5/8 17:08
  5. &emsp;*/
  6. @Component
  7. public class Consumer {
  8. /**
  9. * 生产者的组名
  10. */
  11. @Value("${apache.rocketmq.producer.producerGroup}")
  12. private String producerGroup;
  13. /**
  14. * NameServer 地址
  15. */
  16. @Value("${apache.rocketmq.namesrvAddr}")
  17. private String namesrvAddr;
  18. /**
  19. * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
  20. * 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
  21. */
  22. public void orderedConsumer() throws InterruptedException,MQClientException {
  23. /**
  24. * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
  25. * 注意:ConsumerGroupName需要由应用来保证唯一
  26. */
  27. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
  28. // consumer.setNamesrvAddr("10.10.0.102:9876");
  29. consumer.setNamesrvAddr(namesrvAddr);
  30. /**
  31. * 订阅指定topic下tags分别等于TagA或TagC或TagD
  32. */
  33. consumer.subscribe("Topic1", "TagA || TagC || TagD");
  34. /**
  35. * 订阅指定topic下所有消息<br>
  36. * 注意:一个consumer对象可以订阅多个topic
  37. */
  38. consumer.subscribe("Topic2", "*");
  39. consumer.subscribe("Topic3", "*");
  40. /**
  41. * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
  42. */
  43. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  44. consumer.registerMessageListener(new MessageListenerConcurrently() {
  45. /**
  46. * 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
  47. */
  48. @Override
  49. public ConsumeConcurrentlyStatus consumeMessage(
  50. List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  51. System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
  52. MessageExt msg = msgs.get(0);
  53. if (msg.getTopic().equals("Topic1")) {
  54. if (null != msg.getTags()) {
  55. // 执行Topic1的消费逻辑
  56. if (msg.getTags().equals("TagA")) {
  57. // 执行TagA的消费
  58. System.out.println("TagA开始。");
  59. } else if (msg.getTags().equals("TagC")) {
  60. System.out.println("TagC开始。");
  61. // 执行TagC的消费
  62. } else if (msg.getTags().equals("TagD")) {
  63. // 执行TagD的消费
  64. System.out.println("TagD开始。");
  65. }
  66. }
  67. } else if (msg.getTopic().equals("Topic2")) {
  68. // 执行Topic2的消费逻辑
  69. System.out.println("Topic2");
  70. }
  71. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  72. }
  73. });
  74. /**
  75. * Consumer对象在使用之前必须要调用start初始化,初始化一次即可
  76. */
  77. consumer.start();
  78. System.out.println("Consumer Started.");
  79. }
  80. }

3.3 properties配置文件

  1. # 消费者的组名
  2. apache.rocketmq.consumer.PushConsumer=PushConsumer
  3. # 生产者的组名
  4. apache.rocketmq.producer.producerGroup=Producer
  5. # NameServer地址
  6. apache.rocketmq.namesrvAddr=192.168.220.72:9876
  7. # 设置应用端口
  8. server.port=8089

3.4 测试代码

  1. /**
  2. * @author Coder编程
  3. * @Title: HelloWord
  4. * @ProjectName rocketmq
  5. * @Description: Hello World
  6. * @date 2019/5/814:14
  7. */
  8. @RestController
  9. public class Test {
  10. @Autowired
  11. private Producer producer;
  12. @Autowired
  13. private Consumer consumer;
  14. @RequestMapping("/test")
  15. public String testMQ2() {
  16. try {
  17. System.out.println("-----------------开始生产-----------------");
  18. producer.orderedProducer();
  19. System.out.println("-----------------开始消费-----------------");
  20. consumer.orderedConsumer();
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. return "success";
  25. }
  26. }

4.奉上源码

以上安装jar包和案例测试源码已经上传至GitHub/Gitee

RocketMQ-环境搭建 - 图29

源码地址:

Github地址

Gitee地址

文末

欢迎关注公众号:Coder编程 获取最新原创技术文章和相关免费学习资料,随时随地学习技术知识!

RocketMQ-环境搭建 - 图30

RocketMQ-环境搭建 - 图31

本文转自 https://juejin.cn/post/6844903841201127438,如有侵权,请联系删除。