1.消息队列
消息即是信息的载体。为了让消息发送者和消息接收者都能够明白消息所承载的信息,需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议(JMS)。
消息从发送者到接收者的方式也有两种。一种称为即时消息通讯,即消息从一端发出后立即就可以达到另一端,这种方式具体实现是RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现是消息队列。
消息队列是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
优缺点
- 解耦。传统模式系统间耦合性太强,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码。基于此用消息队列的话A系统只管把消息发到消息队列,其他需要这个消息的来订阅就可以了。

- 异步。传统模式一些非必要业务逻辑以同步方式运行,太耗费时间。基于此使用消息队列后,用户请求数据发送给消息队列后立即返回,再由消息队列消费者进程从消息队列中获取数据,异步写入数据库。线程池也可实现异步,但是每次新增一个接口需要重新发布联调,耦合度高,出现问题不易排查。


- 削峰。传统模式并发量大时,所有请求直接怼到数据库,造成数据库连接异常。基于此引入消息队列后,即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。

- 系统可用性降低:MQ挂掉情况(高可用)
- 系统复杂性增加:需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
- 数据一致性问题:万一消息的消费者并没有正确消费消息,就会导致数据不一致(分布式事务)
JMS
Java消息服务(Java Message Service)是一个消息服务的标准,一套规范的JAVA API 接口,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。JMS不是消息队列,更不是某种消息队列协议。消费模型
JMS具有两种通信模式:(点对点)和(发布/订阅模式)
点对点模式
点对点模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
发布订阅模式
包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。对象模型
ConnectionFactory创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
Destination消息生产者的消息发送目标或者说消息消费者的消息来源。Destination实际上是两种类型的对象:Queue、Topic。
ConnectionConnection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。
SessionSession是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。
消息的生产者消息生产者由Session创建,并用于将消息发送到Destination。同样消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
消息消费者消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
MessageListener消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。2.消息队列选型
| 特性 | ActiveMQ | RabbitMQ | RocketMQ | kafka | | —- | —- | —- | —- | —- | | 开发语言 | java | erlang | java | scala | | 单机吞吐量 | 万级 | 万级 | 10万级 | 10万级,配合大数据类系统进行实时计算与日志采集 | | topic数量对吞吐量的影响 | | | topic达几百、几千,吞吐量有较小幅度降低,同等机器下可支撑大量topic | topic从几十到几百,吞吐量大幅度降低,同等机器下尽量保证topic数量不要过多,如果要支撑大规模topic,横向水平扩展 | | 时效性 | ms级 | us级 | ms级 | ms级以内 | | 可靠性 | 较低概率丢失数据 | 基本不丢 | 参数优化配置可达0丢失 | 参数优化配置可达0丢失 | | 可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) | | 功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低;管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。 |
中小型软件公司数据量没那么大,不能对rocketmq定制化开发,rocketmq排除;消息中间件应首选功能比较完备,kafka排除;RabbitMQ开源,社区活跃,erlang语言天生具备高并发的特性,可能国内没几个能定制化开发erlang的程序员。综述建议选ActiveMQ。
大型软件公司具备搭建分布式环境,也具备足够大的数据量,也可对rocketMQ进行定制化开发;ActiveMQ和RabbitMQ这两者因为吞吐量还有GitHub的社区活跃度的原因,并且在部署方式上前两者也是大不如后面两个天然分布式架构;至于kafka,如果业务场景有日志采集功能,肯定是首选kafka。综述建议根据具体业务场景在rocketMq和kafka之间二选一。
没有最好的技术,只有最适合的技术。
3.RocketMQ
概述
RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,前身是MetaQ,是阿里参考Kafka特点研发的一个队列模型的消息中间件,后开源给apache基金会成为了apache的顶级开源项目,具有高性能、高可靠、高实时、分布式特点。
发展历程
2011年:阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题。
2012年:阿里巴巴开源其自研的第三代分布式消息中间件——RocketMQ。
2016年11月:阿里将RocketMQ捐献给Apache软件基金会,正式成为孵化项目。
2017年2月20日:RocketMQ正式发布4.0版本,专家称新版本适用于电商领域,金融领域,大数据领域,兼有物联网领域的编程模型。
其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。
这三者分别采用了不同的模型,MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题;Notify主要使用了推模型,解决了事务消息;而云产品Aliware MQ则是提供了商业化的版本。
核心模块
- rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息
- rocketmq-client:提供发送、接受消息的客户端API。
- rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
- rocketmq-common:通用的一些类,方法,数据结构等。
- rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
- rocketmq-store:消息、索引存储等。
- rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!(一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件)。
-
4.架构
主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
NameServer
NameServer是一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点是无状态的,节点相互之间无通信,是互相独立的,可以横向扩展。
NameServer主要负责对于源数据管理,包括了对Topic和路由信息的管理。有一点需要注意,Broker向NameServer发心跳时会带上当前自己所负责的所有Topic信息。如果Topic个数太多(万级别)导致一次心跳中Topic数据几十兆,网络情况差的话传输失败导致NameServer误认为Broker心跳失败。(NameServer每隔10秒,扫描所有还存活的Broker连接,若某个连接2分钟内没有发送心跳数据,则断开连接)
每个 Broker 在启动时会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息(30s),Consumer 也会定时(30s)获取 Topic 路由信息。
各个邮局的管理机构Broker
消息中转角色,负责存储消息,转发消息,同时提供Push/Pull接口来将消息发送给Consumer。Broker是具体提供业务的服务器,单个Broker节点与所有NameServer节点保持长连接及心跳,并会定时(30s)将Topic信息注册到NameServe。(底层通信和连接都是基于Netty实现)
邮局Producer
消息生产者,一般由业务系统负责产生消息。Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
RocketMQ 提供了三种方式发送消息:同步、异步和单向 同步发送:消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
- 异步发送:发送方发出数据后不等接收方发回响应,接着发送下个数据包。一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
- 单向发送:只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。
Consumer
消息消费者,一般由后台系统负责异步消费。Consumer也由用户部署,支持PUSH和PULL两种消费请求,支持集群消费和广播消息,提供实时的消息订阅机制。
RocketMQ 提供了两种消费请求:PUSH、PULL
- Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要拉取到消息用户应用就会启动消费,主动消费型。
- Push:推送型消费者(Push Consumer)封装了消息拉取、消费进度和其他内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现,被动消费类型。从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。
消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)
- 默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例,默认分配算法是AllocateMessageQueueAveragely;另外一种平均算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式。
- 如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
- 广播消费消息会发给消费者组中的每一个消费者进行消费。
消费同一类消息的多个 Consumer 实例组成一个消费者组
收信者
Message、Message Queue、Topic、Offset、Tag
Topic(主题)可以看做消息的规类,是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等。Producer将消息发往指定的Topic,Consumer订阅该Topic就可以收到这条消息。
Topic 与broker、生产者和消费者关系是多对多。
Tag(标签)可以看作子主题,是消息的第二级类型,用于为用户提供额外的灵活性。使用标签同一业务模块不同目的的消息就可以用相同 Topic 、不同 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag 。
Message Queue(消息队列),主题被划分为一个或多个子主题,即消息队列。一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下所有队列将消息发出去。
Message(消息)是要传输的信息,使用MessageId唯一识别。一条消息必须有一个主题(Topic),也可以拥有一个可选的标签(Tag)和额处的键值对。用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。
Offset (偏移量)是存储消息时会为每个Topic下的每个Queue生成一个消息索引文件,每个Queue都对应一个Offset记录当前Queue中消息条数。
通信流程
NameService启动,Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。
Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
5.持久化
Broker 在消息存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中。
存储介质
- 关系型数据库DBApache下开源的另外一款MQ—ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
- 文件系统目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。
消息存储与发送
高性能磁盘顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍。RocketMQ的消息用顺序写保证了消息存储的速度。
数据复制过程
通过使用mmap的方式,可以省去向用户态的内存复制提高速度(零拷贝)。这种机制在Java中是通过MappedByteBuffer实现的,一次只能映射1.5~2G 的文件至用户态的虚拟内存。存储结构
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每 个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。IndexFile是为了消息查询提供了一种通过key或时间区间来查询消息的方法。
刷盘机制
两种刷盘策略
同步刷盘:当数据成功写到内存中之后立刻刷盘(同步),在保证消息写到磁盘也成功的前提下返回写成功状态。保证了数据的可靠性,效率较低。
异步刷盘 :数据写入内存后,直接返回成功状态。异步将内存中的数据持久化到磁盘上。提高系统的吞吐量,不能保证数据的可靠性。
消息复制方式
两种复制策略
同步复制: 当数据成功写到内存中Master节点之后立刻同步到Slave中,当Slave也成功的前提下返回写成功状态。数据安全性高,性能低一点。
异步复制: 当数据成功写到内存中Master节点之后,直接返回成功状态,异步将Master数据存入Slave节点。数据可能丢失,性能高一点
线上采用同步复制 + 异步刷盘。6.高可用
Namesrv 需要部署多个节点,以保证 Namesrv 的高可用。多个 Namesrv 之间不会有数据的同步,通过 Broker 向多个 Namesrv 写。
多个 Broker 可以形成一个 Broker 分组。每个 Broker 分组存在一个 Master 和多个 Slave 节点。Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。Master 节点可提供读和写功能,Slave 节点可提供读功能。Master 节点会不断发送新的 CommitLog 给 Slave节点,Slave 节点不断上报本地的 CommitLog 已经同步到的位置给 Master 节点。 Broker 集群和集群之间不存在通信与数据同步。
Producer 自身在应用中,所以无需考虑高可用。Producer 配置多个 Namesrv 列表从而保证 Producer 和 Namesrv 的连接高可用。
Consumer 需要部署多个节点以保证 Consumer 自身的高可用。当相同消费者分组中有新的 Consumer 上线,或者老的 Consumer 下线,会重新分配 Topic 的 Queue 到目前消费分组的 Consumer。Consumer 配置多个 Namesrv 列表从而保证 Consumer 和 Namesrv 的连接高可用。
7.消息重试
默认重试次数:Product默认是2次,而Consumer默认是16次。
重试时间间隔:Product是立刻重试,而Consumer是有一定时间间隔的。顺序消息重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时应用会出现消息消费被阻塞的情况。因此在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。无序消息重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。重试次数
消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:
| 第几次重试 | 与上次重试的间隔时间 | 第几次重试 | 与上次重试的间隔时间 |
|---|---|---|---|
| 1 | 10 秒 | 9 | 7 分钟 |
| 2 | 30 秒 | 10 | 8 分钟 |
| 3 | 1 分钟 | 11 | 9 分钟 |
| 4 | 2 分钟 | 12 | 10 分钟 |
| 5 | 3 分钟 | 13 | 20 分钟 |
| 6 | 4 分钟 | 14 | 30 分钟 |
| 7 | 5 分钟 | 15 | 1 小时 |
| 8 | 6 分钟 | 16 | 2 小时 |
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
配置方式
消费失败后,重试配置方式
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
- 返回 Action.ReconsumeLater (推荐)
- 返回 Null
- 抛出异常
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//处理消息
doConsumeMessage(message);
//方式1:返回 Action.ReconsumeLater,消息将重试
return Action.ReconsumeLater;
//方式2:返回 null,消息将重试
return null;
//方式3:直接抛出异常, 消息将重试
throw new RuntimeException(“Consumer Message exceotion”);
}
}
消费失败后,不重试配置方式
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
- 自定义消息最大重试次数消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:Properties properties = new Properties();
//配置对应 Group ID 的最大消息重试次数为 20 次
properties.put(PropertyKeyConst.MaxReconsumeTimes,”20”);
Consumer consumer =ONSFactory.createConsumer(properties);- 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
- 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
获取消息重试次数
消费者收到消息后,可按照如下方式获取消息的重试次数:
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
//获取消息的重试次数
System.out.println(message.getReconsumeTimes());
return Action.CommitMessage;
}
}
8.死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信消息具有以下特性
- 不会再被消费者正常消费。
- 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。
- 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
- 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
9.消息幂等(不被重复消费)
幂等操作的特点是任意多次执行所产生的影响均与一次执行的影响相同。即同样的参数调用我这个接口,调用多少次结果都是一个。
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
网络原因闪断,ACK返回失败、开发人员代码Bug等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息,再次将该消息分发给其他消费者。一般使用业务端逻辑保持幂等性。以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置
- 添加一张消息消费记录表,表字段加上唯一约束条件(UNIQUE),消费完之后往表里插入一条数据。因为加了唯一约束条件,第二次保存时MySQL 就会报错,通过数据库可以限制重复消费。
- 以redis为例,给消息分配一个全局id,只要消费过该消息,将
以K-V形式写入redis.那消费者开始消费前,先去redis中查询有没有消费记录即可。 系统并发很高,那么可以使用 Redis 或者 zookeeper 分布式锁对消息 id 加锁,然后使用上面的几个方法进行幂等性控制。
10.如何处理消息丢失(可靠性传输)
可靠性传输要从三个角度来分析:
生产者丢数据:
- 消息队列丢数据:开启持久化磁盘的配置
- 消费者丢数据:手动确认消息
11.如何保证消息的顺序性
生产者消费者一般需要保证顺序消息的话,可能就是一个业务场景下的,比如订单的创建、支付、发货、收货。一个topic下有多个队列,为了保证发送有序,RocketMQ提供了MessageQueueSelector队列选择机制,有三种实现:
使用Hash取模法,让同一个订单发送到同一个队列中,再使用同步发送,只有同个订单的创建消息发送成功,再发送支付消息。这样,我们保证了发送有序。
SelectMessageQueueByRandoom,SelectMessageQueueByHash,SelectMessageQueueByMachineRoom
一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 订单生成,它拿完后,消费者2去queue拿到的是 订单支付。拿的顺序是没毛病了,但关键是先拿到不代表先消费完它。会存在虽然消费者1先拿到订单生成,但由于网络等原因消费者2比消费者1真正的先消费消息。
采用分段锁锁单个Queue而不是锁整个Broker,这样就可以保证局部顺序消费了。消费者1去Queue拿 订单生成,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。然后下一个消费者去拿到 订单支付 同样锁住当前Queue,这样来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。
顺序消息暂不支持广播模式;
12.事务消息
RocketMQ是一种最终一致性的分布式事务,即它保证的是消息最终一致性。Half Message(半消息)
是指暂不能被Consumer消费的消息。Producer 已经把消息成功发送到了 Broker 端,但此消息被标记为暂不能投递状态,处于该种状态下的消息称为半消息。需要 Producer对消息的二次确认后,Consumer才能去消费它。
半消息作用:
1)可以先确认 Brock服务器是否正常 ,如果半消息都发送失败了 那说明Brock挂了。
2)可以通过半消息来回查事务,如果半消息发送成功后一直没有被二次确认,那么就会回查事务状态。消息回查
由于网络闪段,生产者应用重启等原因。导致 Producer 端一直没有对 Half Message(半消息) 进行 二次确认。这是Broker服务器会定时扫描长期处于半消息的消息,会动询问 Producer端 该消息的最终状态(Commit或者Rollback),该消息即为消息回查。
- 执行本地事务的时候,由于突然网络等原因一直没有返回执行事务的结果(commit或者rollback)导致最终返回UNKNOW,那么就会回查。
本地事务执行成功后,返回Commit进行消息二次确认的时候的服务挂了,在重启服务那么这个时候在broker端 它还是个Half Message(半消息),这也会回查。
交互流程
A服务先发送个Half Message给Brock端,消息中携带 B服务 即将要+100元的信息。
- 当A服务知道Half Message发送成功后,那么开始第3步执行本地事务。
- 执行本地事务(会有三种情况1、执行成功。2、执行失败。3、网络等原因导致没有响应)
- 如果本地事务成功,那么Producer像Broker服务器发送Commit,这样B服务就可以消费该message。
- 如果本地事务失败,那么Producer像Broker服务器发送Rollback,那么就会直接删除上面这条半消息。
如果因为网络等原因迟迟没有返回失败还是成功,那么会执行RocketMQ的回调接口来进行事务的回查。
13.如何处理消息积压
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。
- 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
- 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
- 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
- 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
14.回溯消费
回溯消费是指Consumer已经消费成功的消息,由于业务上的需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度。
例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。参考
消息队列MQ-面试题
消息队列面试题要点
RocketMQ
MQ随笔
