一、MQ基础

1.1 什么是消息队列

消息队列(Message Queue),从广义上讲是一种消息队列服务中间件,提供一套完整的消息生产、传递、消费的软件系统。

1.2 为什么需要消息队列

  • 异步:在非必要串行的地方实现并行化,从而缩短响应时间,提升系统性能。
  • 解耦:避免上游为对接多个下游时频繁地修改接口,降低系统间的耦合度。
  • 削峰填谷:为了避免大量的请求冲击后台服务,可以使用消息队列暂存请求,后台服务以最大处理能力消费请求,保证后台的安全性。

image.png image.png

带来的问题:

  • 系统可用性降低: 加入的消息队列的可用性影响系统整体可用性
  • 系统复杂性增加:要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

    1.3 消息队列选型

    image.png

    二、RocketMQ/ONS原理

    2.1 介绍

  • RocketMQ: 阿里开源的一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

  • ONS: 阿里商业版RocketMQ云服务。

    2.2 技术架构

    RocketMQ/ONS原理与使用 - 图4

    2.3 概念

    **Producer:负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
    Producer Group:同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
    Consumer:负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。

Consumer Group:同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。
Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
Name Server:在消息队列 RocketMQ中提供命名服务,更新和发现 Broker 服务,相当于一个路由控制中心。

Message:消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。

消息Topic、Tag

Topic:表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
Tag:为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

2.4 集群工作流程

RocketMQ/ONS原理与使用 - 图5**

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. 启动Broker,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台(随机选择)建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

2.5 消息模型

队列(Queue)模型:消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。
image.png
发布 - 订阅(Pub/Sub)模型:发布者(Publisher)将消息发送到主题(Topic)中,订阅者(Subscriber)在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
image.png
RocketMQ 的消息模型:RocketMQ 在Topic下面增加了队列的概念。每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。
image.png
Kafka消息模型和RocketMQ一样,区别是Kafka中queue对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

2.5 消息类型

普通消息:消息队列 RocketMQ 版中无特性的消息,区别于有特性的定时和延时消息、顺序消息和事务消息。
顺序消息:允许消息消费者按照消息发送的顺序对消息进行消费。
定时和延时消息:允许消息生产者对指定消息进行定时(延时)投递,最长支持 40 天。
事务消息:实现类似 X/Open XA 的分布事务功能,以达到事务最终一致性状态。

2.6 消息生产原理

2.6.1 消息发送方式

普通消息发送方式:

  • 同步发送:需要Broker返回确认信息,主要运用在比较重要一点消息传递/通知等业务:
  • 异步发送:需要Broker返回确认信息,通常用于对发送消息响应时间要求更高/更快的场景:
  • 单向发送:不需要Broker返回确认信息,适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

顺序消息发送:
对于指定的一个 Topic,所有消息根据 sharding key 选择队列,为同步发送。

2.6.2 可靠消息发送:

Producer发送阶段

  • 同步或异步发送结果确认:返回SEND_OK表示发送到Broker成功。
  • 重试机制:发送消息如果失败或者超时,则重新发送。可通过setRetryTimesWhenSendFailed、setRetryTimesWhenSendAsyncFailed设置重试次数。
  • Broker提供多master模式:即使某台broker宕机了,保证消息可以投递到另外一台正常的broker上。

Broker处理阶段:
默认情况下,消息只要到了 Broker 端,将会优先保存到内存中,然后立刻返回确认响应给生产者。随后 Broker 定期批量的将一组消息从内存异步刷入磁盘。
这种方式减少 I/O 次数,可以取得更好的性能,但是如果发生机器掉电,异常宕机等情况,消息还未及时刷入磁盘,就会出现丢失消息的情况。
若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。
为了保证可用性,Broker 通常采用一主(master)多从(slave)部署方式。为了保证消息不丢失,消息还需要复制到 slave 节点。
若需要严格保证消息不丢失: master同步刷盘,同步复制到slave.

2.7 消息消费原理

2.7.1 消费模式

RocketMQ对于消费者消费消息有两种模式:

  • BROADCASTING:广播式消费,这种模式下,一个消息会被通知到每一个消费者(比如更新实例的本地缓存)。
  • CLUSTERING: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者上进行消费。消费者实例超过队列个数时,多余的实例不分配。

image.png
image.png

2.7.2 消息投递的方式:

Pull模式:当服务端收到这条消息后什么也不做,只是等着 Consumer 主动到自己这里来读,即 Consumer 这里有一个“拉取”的动作。拉取成功后处理数据,处理完成再次进行拉取,循环执行。缺点是如果不能很好的设置拉取的频率,时间间隔,过多的空轮询会对服务端造成较大的访问压力,数据的实时性也不能得到很好的保证。
Push模式:当 Producer 发出的消息到达后,服务端马上将这条消息投递给 Consumer。当服务端有数据立即通知客户端,这个策略依赖服务端与客户端之间的长连接,它具有高实时性、客户端开发简单等优点;同时缺点也很明显,比如:服务端需要感知与它建立链接的客户端,要实现客户端节点的发现,服务端本身主动推送,需要服务端对消息做额外的处理,以便能够及时将消息分发给客户端。
长轮询: RocektMQ使用了结合推拉模式两者优点的长轮询机制。它本质上还是拉模式,但服务端能够通过hold住请求的方式减少客户端对服务端的频繁访问,从而提高资源利用率及消息响应实时性。

2.7.3 消息过滤:

解决大量无意义流量的传输,即Broker只传给客户端需要的消息。RocketMQ支持Tag过滤、SQL92过滤、Filter Server过滤。
Tag过滤流程

  1. Producer发送带Tag消息
  2. Consumer订阅时设置Topic及Tag
  3. Broker保存订阅关系,做Tag过滤

2.7.4 可靠消费:

重试-死信机制:
集群消费(MessageModel.CLUSTERING)模式下,当消息消费失败,RocketMQ会通过消息重试机制重新投递消息,努力使该消息消费成功(广播消息是不会重试的)。

  • 普通消息重试:

当消费者消费该重试消息后,需要返回结果给broker,告知broker消费成功(ConsumeConcurrentlyStatus.CONSUME_SUCCESS)或者需要重新消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)。
RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  • 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 业务消费方返回null
  • 业务消费方主动/被动抛出异常


RocketMQ的重试时间窗,当消息需要重试时,会按照该规则进行重试。
可以在RocketMQ的broker.conf配置文件中配置Consumer侧重试次数及时间间隔, 配置如下
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是一个RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到死信队列(DLQ-Dead Letter Queue)。可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。应用可以监控死信队列来做人工干预。

  • 顺序消息重试:

顺序消息消费失败重试时,应用会出现消息消费被阻塞的情况(消息队列 RocketMQ 版会自动不断地进行消息重试((每次间隔时间为 1 秒)),重试最大值是Integer.MAX_VALUE)。
务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
当使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费。

Rebalance机制

2.7.5 消息重复

消息投递的QoS定义

  • 最多一次(At most once)
  • 至少一次(At least once)
  • 仅一次( Exactly once)

RocketMQ仅支持At Least Once。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。

  1. 消费端处理消息的业务逻辑保持幂等性最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 设置
  2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现,利用一张日志表(缓存)来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

2.7.6 消费进度保存机制


RocketMQ是以consumer group+queue为单位是管理消费进度的,以一个consumer offset标记这个这个消费组在这条queue上的消费进度(消息内容不删)。
每次消息消费成功后,本地的消费进度会被更新,然后由定时器定时同步到broker,以此持久化消费进度(如果客户端没有同步成功,该部分消息会重复投递)。
如果某已存在的消费组出现了新消费实例的时候,依靠这个组的消费进度,就可以判断第一次是从哪里开始拉取的。
Broker端消费进度存储

  1. {
  2. "offsetTable":{
  3. "TopicTest@pullConsumerGroupTest":{0:1578,1:1578,2:1578,3:1578}
  4. }
  5. }

TopicTest这个topic,pullConsumerGroupTest这个消费者组的各个队列的offset位点,其中队列0到1578 ,队列1到1578….。详情大家可以去broker的存储目录下找config/consumerOffset.json
**

Consumer启动消费位置
新Consumer Group:
CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前

已经消费过的Consumer Group:
从本消费组broker记录的消费进度(consumer offset)开始.

2.8 消息存储原理

消息堆积
消息队列RocketMQ版能支持10亿级别的消息堆积,不会因为消息堆积导致性能明显下降。
**

RockeMQ存储模型
消息持久化保存到磁盘中,且消费队列本身不保存消息本地,保存消息磁盘索引,通过FileChannel的MMAP机制实现内存映射,处理消息时能达到基本和内存相同的效率。设置同步复制和同步刷盘即可保存消息不丢失。

什么时候清理物理消息文件?
那消息文件到底删不删,什么时候删?
消息存储在CommitLog之后,的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息文件(CommitLog):
消息文件过期(默认72小时)(不管是否消费过),且到达清理时点(默认是凌晨4点),删除过期文件。
消息文件过期(默认72小时),且磁盘空间达到了水位线(默认75%),删除过期文件。
磁盘已经达到必须释放的上限(85%水位线)的时候,则开始批量清理文件(无论是否过期),直到空间充足。
注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的,broker会拒绝写入服务。

三、RocketMQ/ONS功能与使用

3.1 主要功能

3.1.1 顺序消息

消息有序指的是一类消息消费时,能按照发送的顺序来消费。
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
分区顺序
RocketMQ/ONS原理与使用 - 图11
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
eg: 订单成交消息,多次成交,需要保证该订单的消息是有序的,但和其它订单消息可以不用保证顺序。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

1、生产端 同一orderID的订单取hashCode通过MessageQueueSelector放到同一个queue。

  1. // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
  2. // RocketMQ默认提供了两种MessageQueueSelector实现:随机和Hash
  3. // 可以根据业务shardingKey实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
  4. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  5. @Override
  6. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) {
  7. int select = Math.abs(shardingKey.hashCode());
  8. if (select < 0) {
  9. select = 0;
  10. }
  11. return mqs.get(select % mqs.size());
  12. }
  13. }, orderId);

2、消费端 同一个queue取出消息的时候通过MessageListenerOrderly锁住整个queue,直到消费后再解锁。

  1. //MessageListenerOrderly
  2. consumer.registerMessageListener(new MessageListenerOrderly() {
  3. AtomicLong consumeTimes = new AtomicLong(0);
  4. @Override
  5. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  6. context.setAutoCommit(true);
  7. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
  8. this.consumeTimes.incrementAndGet();
  9. if ((this.consumeTimes.get() % 2) == 0) {
  10. return ConsumeOrderlyStatus.SUCCESS;
  11. } else if ((this.consumeTimes.get() % 3) == 0) {
  12. return ConsumeOrderlyStatus.ROLLBACK;
  13. } else if ((this.consumeTimes.get() % 4) == 0) {
  14. return ConsumeOrderlyStatus.COMMIT;
  15. } else if ((this.consumeTimes.get() % 5) == 0) {
  16. context.setSuspendCurrentQueueTimeMillis(3000);
  17. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  18. }
  19. return ConsumeOrderlyStatus.SUCCESS;
  20. }
  21. });

顺序消息缺陷

  • 顺序消息不支持异步发送方式,否则将无法严格保证顺序
  • 顺序消息消费暂不支持广播模式。
  • 消费顺序消息的并行度依赖于队列数量
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停

3.1.2 事务消息

事务消息概念

RocketMQ/ONS原理与使用 - 图12
以购物场景为例,用户购买商品,账户系统扣款 100 元的同时,需要保证在下游的会员服务中给该账户增加 100 积分。
以上过程会存在3个问题:
1. 扣款失败、通知增加积分成功;
2. 扣款成功、通知增加积分失败。
3. 扣款成功,并且通知成功了。但是增加积分的时候失败了。
(使用普通的处理方式,无论如何,都无法保证业务处理与消息发送两边的一致性,其根本的原因就在于:远程调用,结果最终可能为成功、失败、超时;而对于超时的情况,处理方最终的结果可能是成功,也可能是失败,调用方是无法知晓的。
eg: 扣款成功,消息存储成功,但是MQ处理超时,从而ACK确认失败,导致发送方本地事务回滚)
RocketMQ的事务消息解决的是问题1和问题2这种场景,也就是解决本地事务执行与消息发送的原子性问题。即解决Producer执行业务逻辑成功之后投递消息可能失败的场景。
而对于问题3这种场景,rocketmq提供了消费失败重试的机制。但是如果消费重试依然失败:人工解决。

事务消息的实现


RocketMQ/ONS原理与使用 - 图13
事务消息详细过程说明
1. Producer发送Half(prepare)消息到broker;
2. half消息发送成功之后执行本地事务;
3.(由用户实现)本地事务执行如果成功则返回commit,如果执行失败则返回roll_back。
4. Producer发送确认消息到broker(也就是将步骤3执行的结果发送给broker),这里可能broker未收到确认消息,下面分两种情况分析:
如果broker收到了确认消息:
- 如果收到的结果是commit,则broker视为整个事务过程执行成功,将消息下发给Conusmer端消费;
- 如果收到的结果是rollback,则broker视为本地事务执行失败,broker删除Half消息,不下发给consumer。
如果broker未收到了确认消息:
broker定时回查本地事务的执行结果(由用户实现);
如果本地事务已经执行则返回commit;如果未执行,则返回rollback;

3.1.3 延迟消息

基本概念:延迟消息是指生产者发送消息发送消息后,不能立刻被消费者消费,需要等待指定的时间后才可以被消费。
场景案例:用户下了一个订单之后,需要在指定时间内(例如30分钟)进行支付,在到期之前可以发送一个消息提醒用户进行支付。

RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的,如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。

ONS: 通过当前时间加延迟时间定时发送
// 10s后传递给consumer
message.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);

回溯消费

回溯消费是指Consumer已经消费成功的消息,由于业务上需求需要重新消费,要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。

消息查询

RocketMQ支持按照下面两种维度(“按照Message Id查询消息”、“按照Message Key查询消息”)进行消息查询。
按照MessageId查询消息
RocketMQ中的MessageId的长度总共有16字节,其中包含了消息存储主机地址(IP地址和端口),消息Commit Log offset。“按照MessageId查询消息”在RocketMQ中具体做法是:Client端从MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封装成一个RPC请求后通过Remoting通信层发送(业务请求码:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,读取消息的过程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的记录并解析成一个完整的消息返回。
按照Message Key查询消息
“按照Message Key查询消息”,主要是基于RocketMQ的IndexFile索引文件来实现的。RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:

3.2 最佳实践

3.2.1 Producer最佳实践

  • 一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤
  • 每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
  • 消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
  • 对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
  • send 消息方法,只要不抛异常,就代表发送成功。由于顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult 中的 status 字段不等于 SEND_OK,就应该尝试重试。
  • 某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息,提高发送性能。

    3.2.2 Topic和Tag最佳实践

  • 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。

  • 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
  • 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
  • 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。

3.2.3 Consumer最佳实践

  • 订阅关系一致:同一个Consumer Group订阅的Topic、Tag必须一致
  • 消费过程要做到幂等(即消费端去重)
  • 消费速度慢处理方式
  1. 先从是否有错误来看:检查是否是消费者出现了大量的消费错误,或者是某一个线程卡死,锁不释放等等
  2. 优化每条消息消费过程
  3. 增加消费者实例,但是同时记得要增加每个主题的队列数量。
  • 消费打印日志:如果消息量较少,建议在消费入口方法打印消息,方便后续排查问题。如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

3.2.4 ONS控制台

  • 消息查询:可以通过 Message ID、Message Key 或 Topic 的时间范围查询
  • 查看消费者状态:订阅关系是否一致、消费速度、消费延迟、堆积量
  • 消费轨迹查询:查看一条消息从生产者发送到消息队列 服务端,再到消费者消费处理,整个过程。
  • 查看订阅关系
  • 重置消费点:按需清除堆积的或不想消费的这部分消息再开始消费,或直接跳转到某个时间点消费该时间点之后的消息(不论是否消费过该时间点之前的消息)
  • 死信队列:3 天后会被自动删除,需尽快处理(重发或手动处理)

image.png


四、参考