1) 概念
1.1) 何为MQ
- MQ(Message Queue)消息队列,是一种用来保存消息数据的队列
- 消息(Message): 服务器间的业务请求
- 队列(Queue):数据结构的一种,特征为 “先进先出”
1.2) MQ的作用/优点
- 应用解耦(异步消息发送)
- 快速应用变更维护(异步消息发送)
流量削峰(削峰填谷)(异步消息发送)
系统可用性降低
- 系统复杂度提高
异步消息机制
ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高
- RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,
- RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强
- kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多
1.5) RocketMQ
1.5.1) 基本概念
- RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)
- 解决所有缺点
1.5.2)名词解释
- topic
- 消息主题,一级消息类型,通过Topic对消息进行分类
- 消息(Message)
- 消息队列中信息传递的载体。
- Tag
- 消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
- Producer
- 消息生产者,也称为消息发布者,负责生产并发送消息
- Producer实例
- Producer的一个对象实例,不同的Producer实例可以运行在不同进程内或者不同机器上。Producer实例线程安全,可在同一进程内多线程之间共享。
- Consumer
- 消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:
- Push Consumer:消息由消息队列RocketMQ版推送至Consumer。
- Pull Consumer:该类Consumer主动从消息队列RocketMQ版拉取消息。目前仅TCP Java SDK支持该类Consumer
- 消息消费者,也称为消息订阅者,负责接收并消费消息。可分为两类:
- Consumer实例
- Consumer的一个对象实例,不同的Consumer实例可以运行在不同进程内或者不同机器上。一个Consumer实例内配置线程池消费消息。
- Group
- 一类Producer或Consumer,这类Producer或Consumer通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
Group ID
步骤1:安装JDK(1.8)
1)解压 jdk
tar -zxvf jdk-8u171-linux-x64.tar.gz
2)配置环境变量
>vim /etc/profile
export JAVA_HOME=/opt/jdk1.8.0_171
export PATH=$PATH:${JAVA_HOME}/bin
3)重新加载配置
>source /etc/profile
>java -version
如果安装完毕 jdk 后 java -version 看到的是 openjdk(需要删除)因为 操作系统默认已经安装了 opendjdk
# 查看 rpm -qa | grep java # 删除(把上一个命令看到的所有的jdk文件 用 如下命令删除) rpm -e --nodeps java-1.8.0-openjdk-1.8.0.232.b09-0.el7_7.x86_64 rpm -e --nodeps java-1.8.0-openjdk-headless-1.8.0.232.b09-0.el7_7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-headless-1.7.0.241-2.6.20.0.el7_7.x86_64 rmp -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64 rpm -e --nodeps java-1.7.0-openjdk-1.7.0.241-2.6.20.0.el7_7.x86_64
步骤2:上传压缩包(zip)
yum -y install lrzsz
rz
- 步骤3:解压缩
unzip rocketmq-all-4.5.2-bin-release.zip
步骤4:修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
2.1.2) 启动服务器
步骤1:启动命名服务器(bin目录下)
sh mqnamesrv
- 步骤2:启动消息服务器(bin目录下)
- 修改
_runbroker.sh_
文件中有关内存的配置(调整的与当前虚拟机内存匹配即可,推荐256m-128m)
- 修改
- 步骤1:配置命名服务器地址
export NAMESRV_ADDR=localhost:9876
- 步骤2:启动生产者程序客户端(bin目录下)
sh tools.sh org.apache.rocketmq.example.quickstart.Producer
- 启动后产生大量日志信息(注意该信息是测试程序中自带的,不具有通用性,仅供学习查阅参考)
- 步骤3:启动消费者程序客户端(bin目录下)
- 谁来发?
- 发给谁?
- 怎么发?
- 发什么?
- 发的结果是什么?
- 打扫战场
3.1) 基于Java环境构建消息发送与消息接收基础程序
3.1.1) 单生产者单消费者(OneToOne)
生产者
步骤1:导入坐标
步骤2:编写发送消息的程序 ```java package com.itheima.base;<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency> </dependencies>
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;
/**
- 生产者,生产消息 *
- @author liqp
@version 1.0 */ public class Producer { public static void main(String[] args) throws Exception {
/* 谁来发? 发给谁? 怎么发? 发什么? 发的结果是什么? 打扫战场 */ //1.创建一个发送消息的对象 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.设定发送的命名服务器地址 producer.setNamesrvAddr("192.168.200.169:9876"); //3.1 启动发送的服务 producer.start(); //4. 创建要发送的消息对象,指定topic,指定消息内容 Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8")); //3.2 发送消息,获取发送结果 SendResult result = producer.send(msg); System.out.println("返回结果" + result); //5. 关闭连接 producer.shutdown();
} }
<a name="286s9"></a>
#### 消费者
```java
package com.itheima.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 消费者,消费消息
*
* @author liqp
* @version 1.0
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("192.168.200.169:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意*
consumer.subscribe("topic1", "*");
//4.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("消息" + new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
System.out.println("接收消息服务已开启运行");
}
}
3.1.2) 单生产者多消费者(OneToMany)
生产者(发送多条消息)
//1.创建一个发送消息的对象
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("192.168.200.169:9876");
//3.1 启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
//4. 创建要发送的消息对象,指定topic,指定消息内容
Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8"));
//3.2 发送消息,获取发送结果
SendResult result = producer.send(msg);
System.out.println("返回结果" + result);
}
//5. 关闭连接
producer.shutdown();
消费者(多消费者)
负载均衡模式/集群模式(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
3.1.3) 多生产者多消费者(ManyToMany)
多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费
3.2) 发送不同类型的消息
3.2.1) 同步消息
即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
SendResult result = producer.send(msg);
3.2.2) 异步消息
即时性较弱,但需要有回执的消息,例如订单中的某些信息
for (int i = 0; i < 10; i++) {
Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes(StandardCharsets.UTF_8));
//同步消息
/*SendResult result = producer.send(msg);
System.out.println("返回结果" + result);*/
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
@Override
public void onException(Throwable t) {
System.out.println(t);
}
});
}
TimeUnit.SECONDS.sleep(10);
3.2.3) 单向消息
不需要有回执的消息,例如日志类消息
producer.sendOneway(msg);
3.3) 特殊的消息发送
3.3.1) 延时消息
消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用
Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8")); //设置当前消息的延时效果 msg.setDelayTimeLevel(3); SendResult result = producer.send(msg); System.out.println("返回结果:"+result);
目前支持的消息时间
发送批量消息
List<Message> msgList = new ArrayList<Message>(); SendResult send = producer.send(msgList);
消息内容总长度不超过4M
消息内容总长度包含如下:
- topic(字符串字节数)
- body (字节数组长度)
- 消息追加的属性(key与value对应字符串字节数)
- 日志(固定20字节)
3.4) 特殊的消息接收
3.4.1) 消息过滤
tag过滤/分类过滤
生产者
Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));
消费者
//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag, 通过 || 分割可以指定多个tag consumer.subscribe("topic6","tag1 || tag2");
sql过滤/属性过滤/语法过滤
设置broker支持sql过滤
vim /root/rocket/conf/broker.conf
- 在文件内追加内容
enablePropertyFilter=true
- 重启服务
Order.java
package com.itheima.order.domain;
public class Order {
private String id;
private String msg;
@Override
public String toString() {
return "Order{ id='" + id + ", msg='" + msg + '}';
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("192.168.200.169:9876");
producer.start();
//创建要执行的业务队列
List<Order> orderList = new ArrayList<Order>();
Order order11 = new Order();
order11.setId("a");
order11.setMsg("主单-1");
orderList.add(order11);
Order order12 = new Order();
order12.setId("a");
order12.setMsg("子单-2");
orderList.add(order12);
Order order13 = new Order();
order13.setId("a");
order13.setMsg("支付-3");
orderList.add(order13);
Order order14 = new Order();
order14.setId("a");
order14.setMsg("推送-4");
orderList.add(order14);
Order order21 = new Order();
order21.setId("b");
order21.setMsg("主单-1");
orderList.add(order21);
Order order22 = new Order();
order22.setId("b");
order22.setMsg("子单-2");
orderList.add(order22);
Order order31 = new Order();
order31.setId("c");
order31.setMsg("主单-1");
orderList.add(order31);
Order order32 = new Order();
order32.setId("c");
order32.setMsg("子单-2");
orderList.add(order32);
Order order33 = new Order();
order33.setId("c");
order33.setMsg("支付-3");
orderList.add(order33);
//设置消息进入到指定的消息队列中
for(final Order order : orderList){
Message msg = new Message("orderTopic",order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
int mqIndex = order.getId().hashCode() % list.size();
return list.get(mqIndex);
}
}, null);
System.out.println(result);
}
producer.shutdown();
消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
consumer.setNamesrvAddr("192.168.200.169:9876");
consumer.subscribe("orderTopic","*");
//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服务,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt msg : list){
System.out.println(Thread.currentThread().getName()+" 消息:"+new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("接收消息服务已开启运行");
3.6) 事务消息
3.6.1) 正常事务过程与事务补偿过程
3.6.2)事务消息状态
- 提交状态:允许进入队列,此消息与非事务消息无区别
- 回滚状态:不允许进入队列,此消息等同于未发送过
- 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
- 注意:事务消息仅与生产者有关,与消费者无关
3.6.3) 发送事务消息
测试方法
public static void main(String[] args) throws Exception {
//事务补偿过程
unknowMsg();
//事务提交过程
//commitMsg();
//事务回滚过程
//rollbackMsg();
}
事务补偿过程
/**
* 事务补偿过程
* @throws Exception
*/
public static void unknowMsg() throws Exception {
//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.200.169:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//中间状态
return LocalTransactionState.UNKNOW;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("事务补偿过程执行");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("topic12", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("返回结果:" + result);
//事务补偿过程必须保障服务器在运行过程中,否则将无法进行正常的事务补偿
// producer.shutdown();
}
事务提交过程
/**
* 事务提交过程
*
* @throws Exception
*/
public static void commitMsg() throws Exception {
//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.200.169:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//事务提交状态
return LocalTransactionState.COMMIT_MESSAGE;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic8", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("返回结果:" + result);
producer.shutdown();
}
事务回滚
/**
* 事务回滚
*
* @throws Exception
*/
public static void rollbackMsg() throws Exception {
//事务消息使用的生产者是TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("group1");
producer.setNamesrvAddr("192.168.200.169:9876");
//添加本地事务对应的监听
producer.setTransactionListener(new TransactionListener() {
//正常事务过程
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//事务回滚状态
return LocalTransactionState.ROLLBACK_MESSAGE;
}
//事务补偿过程
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return null;
}
});
producer.start();
Message msg = new Message("topic9", ("事务消息:hello rocketmq ").getBytes("UTF-8"));
SendResult result = producer.sendMessageInTransaction(msg, null);
System.out.println("返回结果:" + result);
producer.shutdown();
}
4) 集群搭建
4.1)集群结构与特征分析
4.1.1)集群结构
- 单机
- 一个broker提供服务(宕机后服务瘫痪)
集群
- 多个broker提供服务(单机宕机后消息无法及时被消费)
多个master多个slave
- master到slave消息同步方式为同步(较异步方式性能略低,消息无延迟)
master到slave消息同步方式为异步(较同步方式性能略高,数据略有延迟)

4.1.2)工作流程
步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接
- 步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
- 步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
- 步骤3:producer发信息,连接某个NameServer,并建立长连接
- 步骤4:producer发消息
- 步骤4.1若果topic存在,由NameServer直接分配
- 步骤4.2如果topic不存在,由NameServer创建topic与broker关系,并分配
- 步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
- 步骤6:producer与broker建立长连接,用于发送消息
步骤7:producer发送消息
comsumer工作流程同producer
4.2)双主双从集群搭建
4.2.1) 集群架构
4.2.2)搭建过程
1) 配置主机名称(未来就可以根据主机名找到对应的服务器了)vim /etc/hosts
# nameserver
192.168.184.128 rocketmq-nameserver1
192.168.184.129 rocketmq-nameserver2
# broker
192.168.184.128 rocketmq-master1
192.168.184.129 rocketmq-slave2
192.168.184.129 rocketmq-master2
192.168.184.128 rocketmq-slave1
配置完毕后重启网卡,应用配置systemctl restart network
2) 关闭防火墙
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
3) 配置jdk
详见 2.1.1) 步骤1
4) 配置服务器环境
将rocketmq 解压至跟目录 /
# 解压
unzip rocketmq-all-4.5.2-bin-release.zip
# 修改目录名称
mv rocketmq-all-4.5.2-bin-release rocketmq
vim /etc/profile
#set rocketmq
ROCKETMQ_HOME=/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
配置完毕后重启网卡,应用配置source /etc/profile
5) 创建集群服务器的数据存储目录
主节点创建四个目录/ 从节点四个目录
mkdir /rocketmq/store
mkdir /rocketmq/store/commitlog
mkdir /rocketmq/store/consumequeue
mkdir /rocketmq/store/index
mkdir /rocketmq-slave/store
mkdir /rocketmq-slave/store/commitlog
mkdir /rocketmq-slave/store/consumequeue
mkdir /rocketmq-slave/store/index
注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开
6) 修改配置
不同的节点,应该修改不同的配置,文件夹也应该不一样
cd r/ocketmq/conf/2m-2s-sync
vim broker-a.proerties
#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store-slave
#commitLog 存储路径
storePathCommitLog=/rocketmq/store-slave/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store-slave/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store-slave/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store-slave/checkpoint
#abort 文件存储路径
abortFile=/rocketmq
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128
检查启动内存 (nameserver 和broker 均需要修改)
vim /rocketmq/bin/runbroker.sh
vim /rocketmq/bin/runserver.sh
# 开发环境配置 JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
启动(bin 目录)
nohup sh mqnamesrv &
nohup sh mqbroker -c ../conf/2m-2s-syncbroker-a.properties &
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties &
5) rocketmq-console
rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具
获取地址:https://github.com/apache/rocketmq-externals
6) 高级特性
6.1) 持久化与持久化介质
6.1.1)数据库持久化
ActiveMQ 使用了数据库的消息存储
缺点:数据库瓶颈将成为MQ瓶颈
6.1.2)文件系统持久化
不用数据库,直接用文件存储
(RocketMQ/Kafka/RabbitMQ)
6.2)顺序写与零拷贝
6.2.1)MQ 高效的消息存储与读写方式
- SSD(Solid State Disk)
- 随机写(100KB/s)
- 顺序写(600MB/s)1秒1部电影
6.2.2)零拷贝技术
1) 通过启动时初始化话文件大小来保证 占用固定的磁盘空间,保证磁盘读写速度
2) 零拷贝”技术
数据传输由传统的4次复制简化成3次复制(如下图),减少1次复制过程
Java语言中使用MappedByteBuffer类实现了该技术
要求:预留存储空间,用于保存数据(1G存储空间起步)
6.3)消息存储结构
消息数据存储区域
topic
queueId
message
消费逻辑队列
minOffset
maxOffset
consumerOffset
索引
key索引
创建时间索引
……
6.4) 刷盘机制
6.4.1) 同步刷盘
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ挂起生产者发送消息的线程
3)MQ将消息数据写入内存
4)内存数据写入硬盘
5)磁盘存储后返回SUCCESS
6)MQ恢复挂起的生产者线程
7)发送ACK到生产者
6.4.2) 异步刷盘
1)生产者发送消息到MQ,MQ接到消息数据
2)MQ将消息数据写入内存
3)发送ACK到生产者
--等消息量多了--
4)内存数据写入硬盘
6.4.3) 同步刷盘/ 异步刷盘 优缺点对比
同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)
6.4.4) 配置方式
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
6.5)高可用与主从复制方案
6.5.1)高可用方案
nameserver
nameserver ,通过无状态+全服务器注册 来保证即使一个宕机了也能提供所有的服务
消息服务器
主从架构(2M-2S) ,即使又一台服务器宕机, 服务依旧可以正常提供
注意: master 一旦宕机,slave 只提供消费服务,不能写入新的消息(slave 不会升级为master)
消息生产(开发人员写代码时保障)
生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master仍可正常进行消息接收
消息消费
RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙时候,自动切换由slave承担数据读取的工作
6.5.2)主从复制方案
同步复制
master接到消息后,先复制到slave,然后反馈给生产者写操作成功
优点:数据安全,不丢数据,出现故障容易恢复
缺点:影响数据吞吐量,整体性能低
异步复制
master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
优点:数据吞吐量大,操作延迟低,性能高
缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
配置(配置在启动时 -c 指定的配置文件中 broker.conf)
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
6.6)负载均衡
- Producer负载均衡
内部实现了不同broker集群中对同一topic对应消息队列的负载均衡 - Consumer负载均衡
平均分配
循环平均分配6.7)消息重试
当消息消费后未正常返回消费成功的信息将启动消息重试机制6.7.1) 顺序消息重试
当消费者消费消息失败后,RocketMQ会自动进行消息重试(每次间隔时间为 1 秒) 注意:应用会出现消息消费被阻塞的情况,因此,要对顺序消息的消费情况进行监控,避免阻塞现象的发生
6.7.2) 无序消息重试
无序消息包括普通消息、定时消息、延时消息、事务消息 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费 为保障无序消息的消费,MQ设定了合理的消息重试间隔时长
6.8)死信队列
6.8.1)概念
当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message) 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
6.8.2) 死信队列特征
``` - 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
- 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
- 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
<a name="piVcX"></a> ### 6.8.3)死信队列中消息特征
- 不会被再次重复消费
- 死信队列中的消息有效期为3天,达到时限后将被清除
在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费 ```<a name="HdPmQ"></a> ### 6.8.4) 死信处理
6.9)重复消费与消息幂等
6.9.1)重复消费原因
1 生产者发送了重复的消息
网络闪断
生产者宕机
2 消息服务器投递了重复的消息
网络闪断
3 动态的负载均衡过程
网络闪断/抖动
broker重启
订阅方应用重启(消费者)
客户端扩容
客户端缩容
6.9.2)消息幂等
对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性
6.9.3)解决方案
- 使用业务id作为消息的key
- 在消费消息时,客户端对key做判定,未使用过放行,使用过抛弃
注意:messageId由RocketMQ产生,messageId并不具有唯一性,不能作用幂等判定条件