RocketMQ手记—顺序消息与延迟消息

顺序消息

顺序消息分为全局有序和分区有序

全局有序

当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序, 称为全局有序。

  1. 保证 a,b,c消息能通过这个queue顺序到达消费者。

在创建queue时有三种方式:

  1. 在代码中创建Producer时,可以指定其自动创建的Topic的Queue数量
  2. 在RocketMQ可视化控制台中手动创建Topic时指定Queue数量
  3. 使用mqadmin命令手动创建Topic时指定Queue数量

分区有序

如果有多个queue参与时,只保证当时queue中的消息顺序称为分区有序

备注

  • 一般来说可以使用消息模queue来指定消息传入哪个queue中
  • 但是,这样的做法和hashmap一样会存在取模相同的问题,此时会导致一个消费者消费了两个不同顺序的消息
  • 此时,最好需要一个key来进行判别,且该key可以随着消息进行传送,用消息key来当做key最好
  • 当消费者进行消费时先判断该消息key值,不属于自己消费的消息则不予消费

延时消息

    当消息写入到Broker后,在指定的时长后才可被消费处理的消息,称为延时消息。 采用RocketMQ的延时消息可以实现定时任务的功能,而无需使用定时器。典型的应用场景是,电商交 易中超时未支付关闭订单的场景,12306平台订票超时未支付取消订票的场景。

场景

  • 在电商平台中,订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系 统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。
  • 如果未完 成,则取消订单,将商品再次放回到库存;
  • 如果完成支付,则忽略。
  • 在12306平台中,车票预订成功后就会发送一条延迟消息。
  • 这条消息将会在45分钟后投递给后台 业务系统(Consumer),后台业务系统收到该消息后会判断对应的订单是否已经完成支付。
  • 如 果未完成,则取消预订,将车票再次放回到票池;
  • 如果完成支付,则忽略。

延时等级

延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ服务端的MessageStoreConfig类中的如下变量中: 即,若指定的延时等级为3,则表示延迟时长为10s,即延迟等级是从1开始计数的。

当然,如果需要自定义的延时等级,可以通过在broker加载的配置中新增如下配置(例如下面增加了1 天这个等级1d)。

配置文件在RocketMQ安装目录下的conf目录中。

 messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d

延迟消息实现原理

  • Producer将消息发送到Broker后,Broker会首先将消息写入到commitlog文件,然后需要将其分发到相 应的consumequeue。
  • 不过,在分发之前,系统会先判断消息中是否带有延时等级。
  • 若没有,则直接正 常分发;

若有则需要经历一个复杂的过程

  1. 修改消息的Topic为SCHEDULE_TOPIC_XXXX 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId 目录与consumequeue文件(如果没有这些目录与文件的话)。
  2. 延迟等级delayLevel与queueId的对应关系为queueId = delayLevel -1 需要注意,在创建queueId目录时,并不是一次性地将所有延迟等级对应的目录全部创建完毕, 而是用到哪个延迟等级创建哪个目录 修改消息索引单元内容。
  3. 索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的 Hash值。现修改为消息的投递时间。
  4. 投递时间是指该消息被重新修改为原Topic后再次被写入到 commitlog中的时间。
  5. 投递时间 = 消息存储时间 + 延时等级时间。消息存储时间指的是消息 被发送到Broker时的时间戳。
  6. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中 SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的? 是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue 目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等 级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时 间进行排序的。

投递延时消息

  1. 投递延时消息 Broker内部有⼀个延迟消息服务类ScheuleMessageService,其会消费SCHEDULE_TOPIC_XXXX中的消 息,即按照每条消息的投递时间,将延时消息投递到⽬标Topic中。
  2. 不过,在投递之前会从commitlog 中将原来写入的消息再次读出,并将其原来的延时等级设置为0,即原消息变为了一条不延迟的普通消 息。然后再次将消息投递到目标Topic中。
  3. ScheuleMessageService在Broker启动时,会创建并启动一个定时器TImer,用于执行相应的定时 任务。
  4. 系统会根据延时等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟 等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第 一条消息未到期,则后面的所有消息更不会到期(消息是按照投递时间排序的);
  5. 若第一条消 息到期了,则将该消息投递到目标Topic,即消费该消息。 将消息重新写入commitlog 延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog,并再次形成新的消息索 引条目,分发到相应Queue。
  6. 这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类 ScheuleMessageService