参考地址一
参考地址二
rocketmq 的linux部署
rocketmq的linux部署二 看这个
rocketmq-console与rocketmq-dashboard 可视化
jobs -l  查看 所有nohup进程   或者jps
kill -9 进程号
启动jar包
nohup java -jar rocketmq-console-ng-1.0.0.jar >nohup.out 2>&1 &
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar  >nohup.out 2>&1 &
需要JDK环境
1、安装maven
- 安装包放在usr/local下解压
 
命令: tar zvxf apache-maven-3.6.3-bin.tar.gz
配置环境变量
命令: vim /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.6.3export PATH=$PATH:$MAVEN_HOME/bin
保存后刷新配置
source /etc/profile
- 添加权限:
 
chmod a+x /usr/local/apache-maven-3.6.3/bin/mvn
- 注意路径,完毕后输入:
 
mvn -v
2、安装RocketMq
- 把压缩包放入到user/local目录,解压:
 
unzip rocketmq-all-4.9.2-source-release.zip
- 进入到解压后的目录:
 
cd rocketmq-all-4.9.2/
- 使用Maven安装:
 
mvn -Prelease-all -DskipTests clean install -U(需要几分钟时间,耐心等待)
- 安装完毕后,使用xftp查看rocketmq安装后的目录:
 
distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 表明安装成功
打开根目录/etc/profile添加配置:
# rocketmq环境变量配置export ROCKETMQ=/usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2export PATH=$PATH:$ROCKETMQ/bin
保存后刷新配置:
source /etc/profile
- 添加权限:
 
chmod +x mqnamesrv mqbroker mqshutdown
- 启动前,我们需要创建一下log目录:
- 进入到/root目录,创建log文件夹;
 - 在log文件夹内创建:rocketmqlogs文件夹;
 - touch创建文件
 
 
在rocketmqlogs文件夹内创建namesrv.log文件和broker.log文件;
- 打开rocketmq-4.9.2/bin,找到runserver.sh和runbroker.sh文件,根据自己的内存环境配置
```java
编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vim bin/runbroker.sh参考设置
JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m” 
$ vim bin/runserver.sh
# 参考设置JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- 打开conf目录下的broker.conf文件,在最后面添加外网IP配置:```javabrokerIP1 = 8.xxx.x.xx
不配置默认是内网IP!
2.1、启动namesrv
mqnamesrv &
启动namesrv成功,按crtl+c退出命令!
2.2、启动broker
mqbroker -n 8.xxx.x.xx:9876 -c /usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/conf/broker.conf &
查看启动状态:jps
关闭broker:mqshutdown broker
关闭namesrv:mqshutdown namesrv
3、web可视化
解压源码
tar -zxvf rocketmq-console-1.0.0.tar.gz
重命名
mv rocketmq-externals-rocketmq-console-1.0.0 rocketmq-console
修改端口和rocketmq连接
cd rocketmq-externals-rocketmq-console-1.0.0/rocketmq-console/src/main/resources/vim application.properties
英文输入状态下 按 i 进入insert模式 新增修改如下配置
server.port=8082rocketmq.config.namesrvAddr=localhost:9876
按esc输入 :wq 保存并退出
编译
cd /usr/local/rocketmq-console/rocketmq-console/mvn clean package -Dmaven.test.skip=true
启动
cd target/java -jar rocketmq-console-ng-1.0.0.jar &

访问
- 开放的端口
rocke有9876非vip通道端口:10911vip通道端口:1090910909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口

 
4、rocketMQ角色
- Producer:消息的发送者;举例:发件者
 - Consumer:消息接收者;举例:收件人
 - Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
 - Broker:暂存和传输消息;举例:快递公司
 - NameServer:管理 Broker;举例:快递公司的管理机构
 - Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
 - Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息
 
4.1、broker配置文件详解
broker 默认的配置文件位置在:conf/broker.conf
#所属集群名字brokerClusterName=rocketmq-cluster#broker名字,注意此处不同的配置文件填写的不一样brokerName=broker-a#0 表示 Master,>0 表示 SlavebrokerId=0#nameServer地址,分号分割namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数defaultTopicQueueNums=4#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭autoCreateTopicEnable=true#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭autoCreateSubscriptionGroup=true#Broker 对外服务的监听端口listenPort=10911#删除文件时间点,默认凌晨 4点deleteWhen=04#文件保留时间,默认 48 小时fileReservedTime=120#commitLog每个文件的大小默认1GmapedFileSizeCommitLog=1073741824#ConsumeQueue每个文件默认存30W条,根据业务情况调整mapedFileSizeConsumeQueue=300000#destroyMapedFileIntervalForcibly=120000#redeleteHangedFileInterval=120000#检测物理文件磁盘空间diskMaxUsedSpaceRatio=88#存储路径storePathRootDir=/usr/local/rocketmq/store#commitLog 存储路径storePathCommitLog=/usr/local/rocketmq/store/commitlog#消费队列存储路径存储路径storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue#消息索引存储路径storePathIndex=/usr/local/rocketmq/store/index#checkpoint 文件存储路径storeCheckpoint=/usr/local/rocketmq/store/checkpoint#abort 文件存储路径abortFile=/usr/local/rocketmq/store/abort#限制的消息大小maxMessageSize=65536#flushCommitLogLeastPages=4#flushConsumeQueueLeastPages=2#flushCommitLogThoroughInterval=10000#flushConsumeQueueThoroughInterval=60000#Broker 的角色#- ASYNC_MASTER 异步复制Master#- SYNC_MASTER 同步双写Master#- SLAVEbrokerRole=SYNC_MASTER#刷盘方式#- ASYNC_FLUSH 异步刷盘#- SYNC_FLUSH 同步刷盘flushDiskType=SYNC_FLUSH#checkTransactionMessageEnable=false#发消息线程池数量#sendMessageThreadPoolNums=128#拉消息线程池数量#pullMessageThreadPoolNums=128
4.2、运行流程
- 导入MQ客户端依赖
 
注意:rocketmq-client 的版本,要与 RocketMQ 的版本一致
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.6.0</version></dependency>
消息发送者步骤分析:
- 创建消息生产者 producer,并指定生产者组名
 - 指定 Nameserver 地址
 - 启动 producer
 - 创建消息对象,指定主题 Topic、Tag 和消息体
 - 发送消息
 - 关闭生产者 producer
 
消息消费者步骤分析:
- 创建消费者 Consumer,制定消费者组名
 - 指定 Nameserver 地址
 - 订阅主题 Topic 和 Tag
 - 设置回调函数,处理消息
 - 启动消费者 consumer
 
5、springboot测试案例
pom.xml文件
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version></dependency>
Producer生产者 ```java package com.mq.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
/**
- @Description
 - @Author xinxiaokang
 @Date 2021/12/23 11:02 / @Component public class Producer { /*
生产者的组名 */ @Value(“${apache.rocketmq.producer.producerGroup}”) private String producerGroup;
/**
- NameServer 地址 */ @Value(“${apache.rocketmq.namesrvAddr}”) private String namesrvAddr;
 
public void orderedProducer() throws MQClientException, InterruptedException {/*** 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例* 注意:ProducerGroupName需要由应用来保证唯一* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,* 因为服务器会回查这个Group下的任意一个Producer*/DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr(namesrvAddr);/*** Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法*/producer.start();/*** 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。*/try {for (int i = 0; i < 10; i++) {Message msg = new Message("Topic1",// topic"TagA",// tag"001",// key("Send Msg:Hello MetaQ1").getBytes());// bodySendResult sendResult = producer.send(msg);System.out.println(sendResult);Message msg2 = new Message("Topic2",// topic"TagB",// tag"002",// key("Send Msg:Hello MetaQ2").getBytes());// bodySendResult sendResult2 = producer.send(msg2);System.out.println(sendResult2);Message msg3 = new Message("Topic3",// topic"TagC",// tag"003",// key("Send Msg:Hello MetaQ3").getBytes());// bodySendResult sendResult3 = producer.send(msg3);System.out.println(sendResult3);}} catch (Exception e) {e.printStackTrace();}/*** 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法*/producer.shutdown();}
}
- **Consumer消费者**```javapackage com.mq.rocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import java.util.List;/*** @Description* @Author xinxiaokang* @Date 2021/12/23 11:06*/@Componentpublic class Consumer {/*** 生产者的组名*/@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;/*** NameServer 地址*/@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;/*** 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法*/public void orderedConsumer() throws InterruptedException, MQClientException {/*** 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例* 注意:ConsumerGroupName需要由应用来保证唯一*/DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);// consumer.setNamesrvAddr("10.10.0.102:9876");consumer.setNamesrvAddr(namesrvAddr);/*** 订阅指定topic下tags分别等于TagA或TagC或TagD*/consumer.subscribe("Topic1", "TagA || TagC || TagD");/*** 订阅指定topic下所有消息<br>* 注意:一个consumer对象可以订阅多个topic*/consumer.subscribe("Topic2", "*");consumer.subscribe("Topic3", "*");/*** 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.registerMessageListener(new MessageListenerConcurrently() {/*** 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息*/@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);MessageExt msg = msgs.get(0);if (msg.getTopic().equals("Topic1")) {if (null != msg.getTags()) {// 执行Topic1的消费逻辑if (msg.getTags().equals("TagA")) {// 执行TagA的消费System.out.println("TagA开始。");} else if (msg.getTags().equals("TagC")) {System.out.println("TagC开始。");// 执行TagC的消费} else if (msg.getTags().equals("TagD")) {// 执行TagD的消费System.out.println("TagD开始。");}}} else if (msg.getTopic().equals("Topic2")) {// 执行Topic2的消费逻辑System.out.println("Topic2");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});/*** Consumer对象在使用之前必须要调用start初始化,初始化一次即可*/consumer.start();System.out.println("Consumer Started.");}}
- properties配置文件 ```java
 
消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
生产者的组名
apache.rocketmq.producer.producerGroup=Producer
NameServer地址
apache.rocketmq.namesrvAddr=8.142.76.223:9876
设置应用端口
server.port=8089
- **测试代码**```javapackage com.mq.rocketmq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;/*** @Description* @Author xinxiaokang* @Date 2021/12/23 11:13*/@RestControllerpublic class Test {@Autowiredprivate Producer producer;@Autowiredprivate Consumer consumer;@RequestMapping("/test")public String testMQ2() {try {System.out.println("-----------------开始生产-----------------");producer.orderedProducer();System.out.println("-----------------开始消费-----------------");consumer.orderedConsumer();} catch (Exception e) {e.printStackTrace();}return "success";}}






6、springboot测试案例二
引入依赖
<!--注意: 这里的版本,要和部署在服务器上的版本号一致--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.2</version></dependency>
JmsConfig rockerMQ配置类
public class JmsConfig {//RocketMQServer地址public static final String NAME_SERVER = "127.0.0.1:9876";/*** 主题名称*/public static final String TOPIC = "topic_family";}
Producer生产者实体类 ```java @Slf4j @Component public class Producer { private String producerGroup = “test_producer”; private DefaultMQProducer producer;
public Producer(){//示例生产者producer = new DefaultMQProducer(producerGroup);//不开启vip通道 开通口端口会减2producer.setVipChannelEnabled(false);//绑定name serverproducer.setNamesrvAddr(JmsConfig.NAME_SERVER);start();}/*** 对象在使用之前必须要调用一次,只能初始化一次*/public void start(){try {this.producer.start();} catch (MQClientException e) {e.printStackTrace();}}public DefaultMQProducer getProducer(){return this.producer;}/*** 一般在应用上下文,使用上下文监听器,进行关闭*/public void shutdown(){this.producer.shutdown();}
}
- **Consumer消费者实体类**```java@Slf4j@Componentpublic class Consumer {/*** 消费者实体对象*/private DefaultMQPushConsumer consumer;/*** 消费者组*/public static final String CONSUMER_GROUP = "test_consumer";/*** 通过构造函数 实例化对象*/public Consumer() throws MQClientException {consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);//订阅主题和 标签( * 代表所有标签)下信息consumer.subscribe(JmsConfig.TOPIC, "*");// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// msgs中只收集同一个topic,同一个tag,并且key相同的message// 会把不同的消息分别放置到不同的队列中try {for (Message msg : msgs) {//消费者获取消息 这里只输出 不做后面逻辑处理String body = new String(msg.getBody(), "utf-8");log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);}} catch (UnsupportedEncodingException e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();System.out.println("消费者 启动成功=======");}}
UserController消息验证 ```java @RestController public class UserController {
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); @Autowired Producer producer;
@RequestMapping(“/hello”) public void hello()throws Exception{
for(int i=1;i<10;i++){//创建生产信息Message message = new Message(JmsConfig.TOPIC, "testtag", ("Hello World"+i).getBytes());//发送SendResult sendResult = producer.getProducer().send(message);logger.info("输出生产者信息={}",sendResult);}}
}
```
访问:localhost:8080/hello 获取一下结果


