消息队列是一种异步的服务间通信方式,是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
12.1 消息服务介绍和使用场景
什么是AMQP:
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端中间件不同产品,不同的开发语言等条件的限制。
什么是JMS:
java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口,JMS是一种与厂商无关的API。
核心应用:
解耦:订单系统 >> 物流系统
异步:用户注册 >> 发送邮件,初始化信息
削峰:秒杀、日志处理
跨平台、多语言
分布式事务、最终一致性
RPC(Remote Procedure Call)远程过程调用,简单的理解是一个节点请求另一个节点提供的服务
RPC调用上下游对接、数据源变动、通知下属
12.2 主流消息中间件框架对比
- Apache ActiveMQ
Apache出品,历史悠久,支持多种语言的客户端和协议Java,NET,c++等,基于JMS Provider实现。吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用。 - Kafka
是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafaka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户行动),副本集机制,实现数据冗余,保持数据尽量不丢失,支持多个生产者和消费者。不支持批量和广播消息,运维难度大。 - RabbitMQ
是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端。。。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方便表现不错。使用Erlang开发,阅读和修改源码难度大 - RocketMQ
阿里开源的消息中间件,Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,性能强劲(零拷贝技术),支持海量堆积,支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤,延迟消息等,在阿里内部大规模使用,适合在电商、互联网金融等领域使用。因为是阿里内部从实践到产品的产物,因此里面很多接口、API并不是很普遍适用。
12.3 RocketMQ介绍
是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持分布式事务消息
12.4 RocketMQ核心概念
- 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。
消息生产者(Producer):
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
消息消费者(Consumer):
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
主题(Topic):
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
消息队列(Message Queue):
正真存放消息的地方,每一个主题下面会有多个队列
标签(Tag):
为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。
组(Group):
生产者组(Producer Group)
同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组(Consumer Group)
同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。 - 部署结构
RocketMQ的服务端有两部分组成:- 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。 - 名字服务(Name Server)
名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。
- 代理服务器(Broker Server)

12.5 消息类型
普通消息
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中 每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息 的功能。
普通顺序消息
普通顺序消费模式下,消费者通过同一个消费队列收到的消息是有顺序的,不同消息队列 ,收到的消息则可能是无顺序的。- 严格顺序消息
严格顺序消息模式下,消费者收到的所有消息均是有顺序的。 - 延时消息
是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。 broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
· level == 0,消息为非延迟消息
· 1<=level<=maxLevel,消息延迟特定时间,例如level==1,延迟1s
· level > maxLevel,则level== maxLevel,例如level==20,延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入 特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有 相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消 息写入真实的topic。
需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。 - 事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义 到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
12.6 消息传播模式
- 集群模式(Clustering)
集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。 - 广播模式(Broadcasting)
广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
12.7 消息消费模式
- 拉取式消费(Pull Consumer)
Consumer消费的一种类型,应用通常主动调用Consumer的拉消息方法从Broker服务器拉消 息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。 - 推动式消费(Push Consumer)
Consumer消费的一种类型,该模式下Broker收到数据后会主动推送给消费端,该消费模式 一般实时性较高。
12.8 消息刷盘
- 同步刷盘:
如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回Producer 端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障, 但是性能上 会有较大影响,一般适用于金融业务应用该模式较多。 - 异步刷盘:
能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给 Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的 性能和吞吐量。
12.9 消息重试
Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。
Consumer消费消息 失败通常可以认为有以下几种情况:
1.由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不功,所以最好提供一种定时重试机制,即过10秒后再重试。
2.由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用sleep 30s,再消费下一条消息,这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+consumerGroup”的重试队列
(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的)
用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时, 重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。
12.10 消息重投
生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。
消息 重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。
如下方法可以设置消息重试策略:
retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送
retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。
当出现RemotingException、MQClientException和部分MQBrokerException时会重投。
retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。
retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非 SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。
12.11 死信队列
死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使 用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。
12.12 RocketMQ安装与启动
- 服务器安装
下载: https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip
解压:unzip rocketmq-all-4.7.1-bin-release.zip
根据机器内存情况设置运行内存:- 修改runserver.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m” - 修改runbroker.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g” - 修改tools.sh
JAVA_OPT=”${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m”
- 修改runserver.sh
运行 name server
nohup sh bin/mqnamesrv &
nohup sh bin/mqnamesrv >logs/namesrv.log 2>&1 &
查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
tail logs/namesrv.log
查看进程
ps -ef | grep rocketmq
运行broker
nohup sh bin/mqbroker -n localhost:9876 &
nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true >logs/mpbroker.log 2>&1
查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
tail logs/mpbroker.log
systemctl stop firewalld 关闭防火墙
关闭服务
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
要想完全清空数据,删除文件夹~/store,然后重启ggg
- 控制台安装
下载: git clone https://github.com/apache/rocketmq-externals.git
找到rocketmq-console/src/main/resources/application.properties 根据需求,修改配置
server.port=8081
name server地址
也可以不修改,在启动完console后,在控制台导航栏 - 运维 - NameSvrAddrList一栏设置
修改 pom.xml ,修改RocketMQ相关依赖的版本4.7.1
切换到控制台目录 cd rocketmq-console
mvn clean package -DskipTests
12.13 Springboot集成RocketMQ
导包
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>发送消息 ```java @Autowired private RocketMQTemplate rocketMQTemplate;
//1,发送同步消息
@GetMapping(“/send/{msg}”)
public String sendMessage(@PathVariable(“msg”) String msg) {
//携带tag与key
Message
//2,发送异步消息 @GetMapping(“/send2/{msg}”) public String sendMessage2(@PathVariable(“msg”) String msg) { rocketMQTemplate.asyncSend(“msgtest:order_pay”, MessageBuilder.withPayload(msg) .setHeader(“KEYS”, UUID.randomUUID().toString()) .build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(“发送成功: “ + sendResult); } @Override public void onException(Throwable e) { System.out.println(“发送失败: “ + e.getMessage()); } }); return “异步发送成功!”; }
//3,发送单项消息 @GetMapping(“/send3/{msg}”) public String sendMessage3(@PathVariable(“msg”) String msg) { rocketMQTemplate.sendOneWay(“msgtest:order_pay”, MessageBuilder.withPayload(msg).setHeader(“KEYS”, UUID.randomUUID().toString()).build()); return “发送单项消息成功!”; }
//4,发送顺序消息 @GetMapping(“/send5”) public String sendMessage5() { for(int i = 0; i<10;i++) { String msg = “movie1: “ + i + “ : “ + System.currentTimeMillis(); rocketMQTemplate.syncSendOrderly( “msgtest:order_pay”, MessageBuilder.withPayload(msg).setHeader(“KEYS”,UUID.randomUUID().toString()).build(), //hash值匹配一个消息队列的,所有消息的hash值现在一样,会放入一个队列中 “hashKey” ); } return “发送顺序消息成功!”; }
//5,发送延时消息 @GetMapping(“/send4/{msg}”) public String sendMessage4(@PathVariable(“msg”) String msg) { rocketMQTemplate.syncSend(“msgtest:order_pay”, MessageBuilder.withPayload(msg).setHeader(“KEYS”, UUID.randomUUID().toString()).build(), 3000, //延迟等级 2); return “发送延迟消息成功!”; }
3. 消费消息
```java
@Component
@RocketMQMessageListener(consumerGroup = "order-group",topic = "msgtest",selectorExpression = "order_pay"
//,consumeMode = ConsumeMode.ORDERLY //设置顺序(单线程)消费,默认是多线程异步消费
//,messageModel = MessageModel.CLUSTERING //消息传播模式 Clustering(集群模式)broadcasting(广播模式)
)
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(MessageDto message) {
System.out.println("消费消息");
System.out.println(message);
}
}
- 处理事务消息
```java package com.woniu.movie1.service.impl; import com.woniu.movie1.parameter.Userparameter; import com.woniu.movie1.service.UserService; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.concurrent.TimeUnit;//发送事务消息 @GetMapping("/send6") public String sendMessage6() { Message<String> msg = MessageBuilder.withPayload("登录成功了,我可以做事情了") .setHeader("KEYS", UUID.randomUUID().toString()) .setHeader("tansactionStatu",UUID.randomUUID().toString()) .build(); Userparameter userparameter=new Userparameter("tom","123");//创建一个前端对象 rocketMQTemplate.sendMessageInTransaction("msgtest:order_pay",msg,userparameter); return "发送半消息成功!"; }
@Service @RocketMQTransactionListener public class Consumer implements RocketMQLocalTransactionListener { @Resource private UserService userService; @Autowired private StringRedisTemplate redisTemplate; @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { //System.out.println(“开始执行本地事务”); Userparameter user =(Userparameter)arg;//强转为Userparameter String Statu = message.getHeaders().get(“tansactionStatu”).toString();//获取tansactionStatu try { //int a=1/0; userService.register(user);//执行业务 System.out.println(“本地事务执行成功,反馈成功消息”); redisTemplate.opsForValue().set(Statu,”1”,1, TimeUnit.HOURS);//执行成功,根据tansactionStat为key保存为1表示成功 return RocketMQLocalTransactionState.COMMIT;//注册成功
}catch (Exception e){
System.out.println("本地事务执行失败,反馈失败消息");
redisTemplate.opsForValue().set(Statu,"0",1, TimeUnit.HOURS);//执行失败,根据tansactionStat为key保存为0表示失败
return RocketMQLocalTransactionState.ROLLBACK;//注册失败
}
}
//回查 @Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { System.out.println(“开始回查”); String Statu = message.getHeaders().get(“tansactionStatu”).toString(); String s = (String) redisTemplate.opsForValue().get(Statu); if(s.equals(“1”)){ System.out.println(“开始反馈成功,反馈成功消息”); return RocketMQLocalTransactionState.COMMIT;//注册成功 } System.out.println(“开始失败反馈,反馈失败消息”); return RocketMQLocalTransactionState.ROLLBACK;//注册失败 } }
```
12.14 RocketMQ常见面试问题
- 如何避免消息重复消费?
RocketMQ不保证消息不重复,如果需要保证严格的不重复消费,需要自己在业务端去重,接口幂等性保障,消费端处理业务消息需要要吃幂等性,可以用Redis、关系数据库等来配合验证消息是否被消费。 - RocketMQ如何保证消息的可靠性传输
produce端: 不采用oneway发送,使用同步或则异步方式发送,做好重试,但是重试的Message key必须唯一。投递的日志需要保存,关键字段,投递时间,投递状态,重试次数,请求体,响应体
broker端: 双主双从架构,NamerServer需要多节点,同步双写、异步刷盘(同步刷盘则可靠性更高,但是性能差点,根据业务选择)
consumer端: 消息消费务必保留日志,即消息的元数据和消息体,做好幂等性处理
- 消息发生大量堆积应该怎么处理(大量堆积在broker里面)
线上故障了,怎么处理?
消息堆积了10小时,有几千万条消息待处理,现在怎么办?
修复consumer,然后慢慢消费?也需要几小时才可以消费完成,新的消息怎么办?
正确的姿势:
临时topic队列扩容,并提高消费能力,但是如果增加Consumer数量,堆积的topic里面的message queue 数量固定,过多的
consumer不能分配到message queue。编写临时的处理分发程序,从旧topic快速读取到临时的新topic中,新topic的queue数量扩容多倍,然后再启动更多的consumer进行再临时的新的topic里消费
