前言
最近在学习消息中间件——RocketMQ,打算把这个学习过程记录下来。此章主要介绍环境搭建。此次主要是单机搭建(条件有限),包括在Windows、Linux环境下的搭建,以及console监控平台搭建,最后加一demo验证一下。
环境准备
在搭建RocketMQ之前,请先确保如下环境已经搭建完毕
- Java环境(我的JDK1.8)
- Maven环境(我的3.6.1目前最新版)
- Git环境
没有搭建的同学走传送门:
JDK环境搭建: JAVA8环境搭建 Maven环境搭建: Windows环境下使用Nexus 3.X 搭建Maven私服及使用介绍 Git环境搭建:Git环境搭建及配置
1. Windows环境下搭建
1.1 下载
官方网站:rocketmq.apache.org/
目前最新版的是V4.5.0,点击进去。
选择下载 rocketmq-all-4.5.0-bin-release.zip。弹出另外一个页面,这里选择rocketmq-all-4.5.0-bin-release.zip进行下载。
下载成功后,选择一个目录放好并解压。
1.2 修改JVM配置
以上操作完毕之后,进入目录bin目录,我这里是 H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin。 找到runserver.cmd和runbroker.cmd中的JAVA_OPT。
原JAVA_OPT:
set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
将 Xms Xmx 这两个值改小一些,改为1g,如:
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
自己根据虚拟机内存大小设置,超出内存大小可能会报错。
1.3 配置环境变量
上述步骤执行完毕后,我们需要将RocketMQ安装目录的bin目录配置到环境变量中。
1.4 启动
以上配置都完成,接下来就是启动过程。中间有点坑,请务必按步骤安装。
在RocketMQ安装目录的bin目录下,执行命令cmd:
我的目录:
H:\rocketmq\rocketmq-all-4.5.0-bin-release\rocketmq-all-4.5.0-bin-release\bin
可以通过shift+鼠标右击 触发cmd窗口选项。也可以通过win+R 在窗口输入cmd,进入cmd窗口后移动到bin目录下。
1.4.1 启动NAMESERVER
- 执行命令:start mqnamesrv.cmd
成功后会弹出提示框,此框勿关闭。
1.4.3 启动BROKER
- 执行命令:‘start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true’
注意:假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。打开runbroker.cmd,然后将‘%CLASSPATH%’加上英文双引号。
打开 runbroker.cmd 进行修改 原:
set "JAVA_OPT=%JAVA_OPT% -cp %CLASSPATH%"
修改后:
set "JAVA_OPT=%JAVA_OPT% -cp "%CLASSPATH%""
再次执行命令: 启动成功!
这时候一共有三个窗口。
2. 安装Console监控
2.1 下载
下载完后如图所示:选择——>rocketmq-console
2.2 配置
下载完成之后,进入‘rocketmq-externals\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。
2.2 编译启动
进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。中间有个比较慢的下载过程需要等待。
编译成功之后,cmd进入‘target’文件夹,执行‘java -jar rocketmq-console-ng-1.0.1.jar’,启动‘rocketmq-console-ng-1.0.1.jar’。
2.3 查看
访问地址:localhost:8082
2.Linux环境下搭建
2.1 环境准备
- Java环境
- Maven环境
2.1.1 Linux环境搭建Jdk
下载JDK:www.oracle.com/technetwork…
下载需要的版本:
上传到创建的目录/usr/java
解压命令
tar -zxvf jdk-8u181-linux-x64.tar.gz
配置环境变量命令
vim /etc/profileJAVA_HOME=/usr/java/jdk1.8.0_161JRE_HOME=/usr/java/jdk1.8.0_161/jreCLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/libPATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/binexport JAVA_HOME JRE_HOME CLASS_PATH PATHsource /etc/profile
验证是否成功命令
java -version
按照以上操作,完成JDK的安装。接下来安装Maven环境。
2.1.2 Linux环境搭建Maven
- 下载命令:
wget http://mirror.bit.edu.cn/apache/maven/binaries/apache-maven-3.2.2-bin.tar.gz
- 解压命令:
tar -zxvf apache-maven-3.2.2-bin.tar.gz
- 配置Maven环境命令:
vim /etc/profile#配置maven环境变量export MAVEN_HOME=/usr/maven/apache-maven-3.5.4export MAVEN_HOMEexport PATH=$PATH:$MAVEN_HOME/binsource /etc/profile
- 验证是否成功命令:
mvn -v
2.2 下载RocketMQ
- 下载命令:
wget http://mirrors.hust.edu.cn/apache/rocketmq/4.4.0/rocketmq-all-4.4.0-source-release.zip
- 解压命令:
unzip rocketmq-all-4.4.0-source-release.zip
- 构建二进制文件命令
进入解压后的文件目录。
mvn -Prelease-all -DskipTests clean install -U
2.3 修改JVM配置
同Windows环境一样,修改JVM配置。 移动到目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq/bin 中。编辑bin目录下runserver.sh 与 runbroker.sh文件。
根据个人虚拟机大小进行修改
vim runserver.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"vim runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"
2.4 配置RocketMQ环境变量
分别执行如下命令:
#修改环境变量vim /etc/profileexport ROCKETMQ=/home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmqexport PATH=$PATH:$ROCKETMQ/bin#更新配置source /etc/profile
2.5 启动NAMESERVER
依然在之前的目录 /home/rocketmq/rocketmq-all-4.4.0/distribution/target/apache-rocketmq
- 执行命令:
##启动命令nohup sh bin/mqnamesrv >/dev/null 2>&1 &##查看日志tail -f ~/logs/rocketmqlogs/namesrv.log
可以看图已经成功了!
2.6 启动BROKER
- 执行命令:
##启动命令nohup sh bin/mqbroker -n localhost:9876 &##查看日志tail -f ~/logs/rocketmqlogs/broker.log
注意防火墙,如果端口连接失败,注意开通。
2.7 关闭命令
sh bin/mqshutdown broker //停止 brokersh bin/mqshutdown namesrv //停止 nameserver
2.8 配置Console监控平台
同Windows平台搭建
2.8.1 启动Console
我这里直接将Windows平台打包好的jar包直接丢到了Linux系统中
- 启动命令:
java -jar rocketmq-console-ng-1.0.1.jar
2.8.2 访问Console管理界面
访问地址:http://192.168.220.72:8082
3. Console监控平台说明
这里不做过多介绍,可以参考以下文章
其他博客地址:guozh.net/rocketmqzhi…
3. 案例测试
案例整合环境:SpringBoot环境 案例来源于网络
3.1 pom.xml文件
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.4.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.coderprogramming.rocketmq</groupId><artifactId>rocketmq</artifactId><version>0.0.1-SNAPSHOT</version><name>rocketmq</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
3.2 Producer生产者
** * @Description: 生产者 * @author Coder编程 * @date 2019/5/8 17:08 */@Componentpublic class Producer {/*** 生产者的组名*/@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;/*** NameServer 地址*/@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;public void orderedProducer() throws MQClientException, InterruptedException {/*** 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例* 注意:ProducerGroupName需要由应用来保证唯一* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,* 因为服务器会回查这个Group下的任意一个Producer*/DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法*/producer.start();/*** 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。*/try {for (int i = 0; i < 10; i++) {Message msg = new Message("Topic1",// topic"TagA",// tag"001",// key("Send Msg:Hello MetaQ1").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);Message msg2 = new Message("Topic2",// topic"TagB",// tag"002",// key("Send Msg:Hello MetaQ2").getBytes());// bodySendResult sendResult2 = producer.send(msg2);System.out.println(sendResult2);Message msg3 = new Message("Topic3",// topic"TagC",// tag"003",// key("Send Msg:Hello MetaQ3").getBytes());// bodySendResult sendResult3 = producer.send(msg3);System.out.println(sendResult3);}} catch (Exception e) {e.printStackTrace();}/*** 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法*/producer.shutdown();}}
3.3 Consumer消费者
/** * @Description: 消费者 * @author Coder编程 * @date 2019/5/8 17:08 */@Componentpublic class Consumer {/*** 生产者的组名*/@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;/*** NameServer 地址*/@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;/*** 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法*/public void orderedConsumer() throws InterruptedException,MQClientException {/*** 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例* 注意:ConsumerGroupName需要由应用来保证唯一*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);// consumer.setNamesrvAddr("10.10.0.102:9876");consumer.setNamesrvAddr(namesrvAddr);/*** 订阅指定topic下tags分别等于TagA或TagC或TagD*/consumer.subscribe("Topic1", "TagA || TagC || TagD");/*** 订阅指定topic下所有消息<br>* 注意:一个consumer对象可以订阅多个topic*/consumer.subscribe("Topic2", "*");consumer.subscribe("Topic3", "*");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {/*** 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);MessageExt msg = msgs.get(0);if (msg.getTopic().equals("Topic1")) {if (null != msg.getTags()) {// 执行Topic1的消费逻辑if (msg.getTags().equals("TagA")) {// 执行TagA的消费System.out.println("TagA开始。");} else if (msg.getTags().equals("TagC")) {System.out.println("TagC开始。");// 执行TagC的消费} else if (msg.getTags().equals("TagD")) {// 执行TagD的消费System.out.println("TagD开始。");}}} else if (msg.getTopic().equals("Topic2")) {// 执行Topic2的消费逻辑System.out.println("Topic2");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer对象在使用之前必须要调用start初始化,初始化一次即可*/consumer.start();System.out.println("Consumer Started.");}}
3.3 properties配置文件
# 消费者的组名apache.rocketmq.consumer.PushConsumer=PushConsumer# 生产者的组名apache.rocketmq.producer.producerGroup=Producer# NameServer地址apache.rocketmq.namesrvAddr=192.168.220.72:9876# 设置应用端口server.port=8089
3.4 测试代码
/*** @author Coder编程* @Title: HelloWord* @ProjectName rocketmq* @Description: Hello World* @date 2019/5/814:14*/@RestControllerpublic class Test {@Autowiredprivate Producer producer;@Autowiredprivate Consumer consumer;@RequestMapping("/test")public String testMQ2() {try {System.out.println("-----------------开始生产-----------------");producer.orderedProducer();System.out.println("-----------------开始消费-----------------");consumer.orderedConsumer();} catch (Exception e) {e.printStackTrace();}return "success";}}
4.奉上源码
以上安装jar包和案例测试源码已经上传至GitHub/Gitee
源码地址:
文末
欢迎关注公众号:Coder编程 获取最新原创技术文章和相关免费学习资料,随时随地学习技术知识!
本文转自 https://juejin.cn/post/6844903841201127438,如有侵权,请联系删除。
