参考地址一
参考地址二
rocketmq 的linux部署
rocketmq的linux部署二 看这个
rocketmq-console与rocketmq-dashboard 可视化
jobs -l 查看 所有nohup进程 或者jps
kill -9 进程号
启动jar包
nohup java -jar rocketmq-console-ng-1.0.0.jar >nohup.out 2>&1 &
nohup java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar >nohup.out 2>&1 &
需要JDK环境
1、安装maven
- 安装包放在usr/local下解压
命令: tar zvxf apache-maven-3.6.3-bin.tar.gz
配置环境变量
命令: vim /etc/profile
export MAVEN_HOME=/usr/local/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin
保存后刷新配置
source /etc/profile
- 添加权限:
chmod a+x /usr/local/apache-maven-3.6.3/bin/mvn
- 注意路径,完毕后输入:
mvn -v
2、安装RocketMq
- 把压缩包放入到user/local目录,解压:
unzip rocketmq-all-4.9.2-source-release.zip
- 进入到解压后的目录:
cd rocketmq-all-4.9.2/
- 使用Maven安装:
mvn -Prelease-all -DskipTests clean install -U(需要几分钟时间,耐心等待)
- 安装完毕后,使用xftp查看rocketmq安装后的目录:
distribution/target/rocketmq-4.9.2/rocketmq-4.9.2 表明安装成功
打开根目录/etc/profile添加配置:
# rocketmq环境变量配置
export ROCKETMQ=/usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2
export PATH=$PATH:$ROCKETMQ/bin
保存后刷新配置:
source /etc/profile
- 添加权限:
chmod +x mqnamesrv mqbroker mqshutdown
- 启动前,我们需要创建一下log目录:
- 进入到/root目录,创建log文件夹;
- 在log文件夹内创建:rocketmqlogs文件夹;
- touch创建文件
在rocketmqlogs文件夹内创建namesrv.log文件和broker.log文件;
- 打开rocketmq-4.9.2/bin,找到runserver.sh和runbroker.sh文件,根据自己的内存环境配置
```java
编辑 runbroker.sh 和 runserver.sh 修改默认 JVM 大小
$ vim bin/runbroker.sh参考设置
JAVA_OPT=”${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m”
$ vim bin/runserver.sh
# 参考设置
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- 打开conf目录下的broker.conf文件,在最后面添加外网IP配置:
```java
brokerIP1 = 8.xxx.x.xx
不配置默认是内网IP!
2.1、启动namesrv
mqnamesrv &
启动namesrv成功,按crtl+c退出命令!
2.2、启动broker
mqbroker -n 8.xxx.x.xx:9876 -c /usr/local/rocketmq-all-4.9.2/distribution/target/rocketmq-4.9.2/rocketmq-4.9.2/conf/broker.conf &
查看启动状态:jps
关闭broker:mqshutdown broker
关闭namesrv:mqshutdown namesrv
3、web可视化
解压源码
tar -zxvf rocketmq-console-1.0.0.tar.gz
重命名
mv rocketmq-externals-rocketmq-console-1.0.0 rocketmq-console
修改端口和rocketmq连接
cd rocketmq-externals-rocketmq-console-1.0.0/rocketmq-console/src/main/resources/
vim application.properties
英文输入状态下 按 i 进入insert模式 新增修改如下配置
server.port=8082
rocketmq.config.namesrvAddr=localhost:9876
按esc输入 :wq 保存并退出
编译
cd /usr/local/rocketmq-console/rocketmq-console/
mvn clean package -Dmaven.test.skip=true
启动
cd target/
java -jar rocketmq-console-ng-1.0.0.jar &
访问
- 开放的端口
rocke有9876
非vip通道端口:10911
vip通道端口:10909
10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
4、rocketMQ角色
- Producer:消息的发送者;举例:发件者
- Consumer:消息接收者;举例:收件人
- Consumer Group:消费组;每一个 consumer 实例都属于一个 consumer group,每一条消息只会被同一个 consumer group 里的一个 consumer 实例消费。(不同consumer group可以同时消费同一条消息)
- Broker:暂存和传输消息;举例:快递公司
- NameServer:管理 Broker;举例:快递公司的管理机构
- Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
- Message Queue:相当于是 Topic 的分区;用于并行发送和接收消息
4.1、broker配置文件详解
broker 默认的配置文件位置在:conf/broker.conf
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
4.2、运行流程
- 导入MQ客户端依赖
注意:rocketmq-client 的版本,要与 RocketMQ 的版本一致
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.6.0</version>
</dependency>
消息发送者步骤分析:
- 创建消息生产者 producer,并指定生产者组名
- 指定 Nameserver 地址
- 启动 producer
- 创建消息对象,指定主题 Topic、Tag 和消息体
- 发送消息
- 关闭生产者 producer
消息消费者步骤分析:
- 创建消费者 Consumer,制定消费者组名
- 指定 Nameserver 地址
- 订阅主题 Topic 和 Tag
- 设置回调函数,处理消息
- 启动消费者 consumer
5、springboot测试案例
pom.xml文件
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
Producer生产者 ```java package com.mq.rocketmq;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;
/**
- @Description
- @Author xinxiaokang
@Date 2021/12/23 11:02 / @Component public class Producer { /*
生产者的组名 */ @Value(“${apache.rocketmq.producer.producerGroup}”) private String producerGroup;
/**
- NameServer 地址 */ @Value(“${apache.rocketmq.namesrvAddr}”) private String namesrvAddr;
public void orderedProducer() throws MQClientException, InterruptedException {
/**
* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ProducerGroupName需要由应用来保证唯一
* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
* 因为服务器会回查这个Group下的任意一个Producer
*/
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
/**
* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态
* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,
* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
*/
try {
for (int i = 0; i < 10; i++) {
Message msg = new Message("Topic1",// topic
"TagA",// tag
"001",// key
("Send Msg:Hello MetaQ1").getBytes());// body
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
Message msg2 = new Message("Topic2",// topic
"TagB",// tag
"002",// key
("Send Msg:Hello MetaQ2").getBytes());// body
SendResult sendResult2 = producer.send(msg2);
System.out.println(sendResult2);
Message msg3 = new Message("Topic3",// topic
"TagC",// tag
"003",// key
("Send Msg:Hello MetaQ3").getBytes());// body
SendResult sendResult3 = producer.send(msg3);
System.out.println(sendResult3);
}
} catch (Exception e) {
e.printStackTrace();
}
/**
* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
*/
producer.shutdown();
}
}
- **Consumer消费者**
```java
package com.mq.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @Description
* @Author xinxiaokang
* @Date 2021/12/23 11:06
*/
@Component
public class Consumer {
/**
* 生产者的组名
*/
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
/**
* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。
* 但是实际PushConsumer内部是使用长轮询Pull方式从Broker拉消息,然后再回调用户Listener方法
*/
public void orderedConsumer() throws InterruptedException, MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(producerGroup);
// consumer.setNamesrvAddr("10.10.0.102:9876");
consumer.setNamesrvAddr(namesrvAddr);
/**
* 订阅指定topic下tags分别等于TagA或TagC或TagD
*/
consumer.subscribe("Topic1", "TagA || TagC || TagD");
/**
* 订阅指定topic下所有消息<br>
* 注意:一个consumer对象可以订阅多个topic
*/
consumer.subscribe("Topic2", "*");
consumer.subscribe("Topic3", "*");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals("Topic1")) {
if (null != msg.getTags()) {
// 执行Topic1的消费逻辑
if (msg.getTags().equals("TagA")) {
// 执行TagA的消费
System.out.println("TagA开始。");
} else if (msg.getTags().equals("TagC")) {
System.out.println("TagC开始。");
// 执行TagC的消费
} else if (msg.getTags().equals("TagD")) {
// 执行TagD的消费
System.out.println("TagD开始。");
}
}
} else if (msg.getTopic().equals("Topic2")) {
// 执行Topic2的消费逻辑
System.out.println("Topic2");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可
*/
consumer.start();
System.out.println("Consumer Started.");
}
}
- properties配置文件 ```java
消费者的组名
apache.rocketmq.consumer.PushConsumer=PushConsumer
生产者的组名
apache.rocketmq.producer.producerGroup=Producer
NameServer地址
apache.rocketmq.namesrvAddr=8.142.76.223:9876
设置应用端口
server.port=8089
- **测试代码**
```java
package com.mq.rocketmq;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description
* @Author xinxiaokang
* @Date 2021/12/23 11:13
*/
@RestController
public class Test {
@Autowired
private Producer producer;
@Autowired
private Consumer consumer;
@RequestMapping("/test")
public String testMQ2() {
try {
System.out.println("-----------------开始生产-----------------");
producer.orderedProducer();
System.out.println("-----------------开始消费-----------------");
consumer.orderedConsumer();
} catch (Exception e) {
e.printStackTrace();
}
return "success";
}
}
6、springboot测试案例二
引入依赖
<!--注意: 这里的版本,要和部署在服务器上的版本号一致-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
JmsConfig rockerMQ配置类
public class JmsConfig {
//RocketMQServer地址
public static final String NAME_SERVER = "127.0.0.1:9876";
/**
* 主题名称
*/
public static final String TOPIC = "topic_family";
}
Producer生产者实体类 ```java @Slf4j @Component public class Producer { private String producerGroup = “test_producer”; private DefaultMQProducer producer;
public Producer(){
//示例生产者
producer = new DefaultMQProducer(producerGroup);
//不开启vip通道 开通口端口会减2
producer.setVipChannelEnabled(false);
//绑定name server
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
- **Consumer消费者实体类**
```java
@Slf4j
@Component
public class Consumer {
/**
* 消费者实体对象
*/
private DefaultMQPushConsumer consumer;
/**
* 消费者组
*/
public static final String CONSUMER_GROUP = "test_consumer";
/**
* 通过构造函数 实例化对象
*/
public Consumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
//消费模式:一个新的订阅组第一次启动从队列的最后位置开始消费 后续再启动接着上次消费的进度开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅主题和 标签( * 代表所有标签)下信息
consumer.subscribe(JmsConfig.TOPIC, "*");
// //注册消费的监听 并在此监听中消费信息,并返回消费的状态信息
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// msgs中只收集同一个topic,同一个tag,并且key相同的message
// 会把不同的消息分别放置到不同的队列中
try {
for (Message msg : msgs) {
//消费者获取消息 这里只输出 不做后面逻辑处理
String body = new String(msg.getBody(), "utf-8");
log.info("Consumer-获取消息-主题topic为={}, 消费消息为={}", msg.getTopic(), body);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("消费者 启动成功=======");
}
}
UserController消息验证 ```java @RestController public class UserController {
private static Logger logger = LogManager.getLogger(LogManager.ROOT_LOGGER_NAME); @Autowired Producer producer;
@RequestMapping(“/hello”) public void hello()throws Exception{
for(int i=1;i<10;i++){
//创建生产信息
Message message = new Message(JmsConfig.TOPIC, "testtag", ("Hello World"+i).getBytes());
//发送
SendResult sendResult = producer.getProducer().send(message);
logger.info("输出生产者信息={}",sendResult);
}
}
}
```
访问:localhost:8080/hello 获取一下结果