序言

rocketmq 支持顺序消息,本文将介绍:

  1. 是全局顺序还是分区顺序?
    1. 会出现短暂乱序吗?此时如何处理?
  2. 有序是以发送方角度还是接收方角度?
  3. 如何发送和消费顺序消息?
  4. 有序是如何实现的?是服务端实现还是消费端实现?

一、rocket mq 顺序消息特性

1. 全局顺序 VS 分区顺序

metaq只支持分区顺序

顺序消息类型分为两种:全局顺序和分区顺序。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

  • 分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

2. 注意事项

  • 顺序消息暂不支持广播模式。
  • 同一个 Producer ID 或者 Consumer ID 只能对应一种类型的 Topic,即不能同时用于顺序消息和无序消息的收发。
  • 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
  • 基于上述实现,当服务端进行扩缩容时,队列数变化,会出现短暂的乱序可能

二、rocket mq 顺序消息使用

1. 发送方

发送方和无序消息的区别是在发送时选择 MessageQueue。

  1. public class OrderedProducer {
  2. public static void main(String[] args) throws Exception {
  3. //Instantiate with a producer group name.
  4. MQProducer producer = new DefaultMQProducer("example_group_name");
  5. //Launch the instance.
  6. producer.start();
  7. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  8. for (int i = 0; i < 100; i++) {
  9. int orderId = i % 10;
  10. //Create a message instance, specifying topic, tag and message body.
  11. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
  12. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  13. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  14. @Override
  15. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  16. Integer id = (Integer) arg;
  17. int index = id % mqs.size();
  18. return mqs.get(index);
  19. }
  20. }, orderId);
  21. System.out.printf("%s%n", sendResult);
  22. }
  23. //server shutdown
  24. producer.shutdown();
  25. }
  26. }

2. 接收方

接收方和无序消息的区别时注册 MessageListenerOrderly

  1. public class OrderedConsumer {
  2. public static void main(String[] args) throws Exception {
  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");
  4. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  5. consumer.subscribe("TopicTest", "TagA || TagC || TagD");
  6. consumer.registerMessageListener(new MessageListenerOrderly() {
  7. AtomicLong consumeTimes = new AtomicLong(0);
  8. @Override
  9. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
  10. ConsumeOrderlyContext context) {
  11. context.setAutoCommit(false);
  12. System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
  13. this.consumeTimes.incrementAndGet();
  14. if ((this.consumeTimes.get() % 2) == 0) {
  15. return ConsumeOrderlyStatus.SUCCESS;
  16. } else if ((this.consumeTimes.get() % 3) == 0) {
  17. return ConsumeOrderlyStatus.ROLLBACK;
  18. } else if ((this.consumeTimes.get() % 4) == 0) {
  19. return ConsumeOrderlyStatus.COMMIT;
  20. } else if ((this.consumeTimes.get() % 5) == 0) {
  21. context.setSuspendCurrentQueueTimeMillis(3000);
  22. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
  23. }
  24. return ConsumeOrderlyStatus.SUCCESS;
  25. }
  26. });
  27. consumer.start();
  28. System.out.printf("Consumer Started.%n");
  29. }
  30. }

三、rocket mq 顺序消息实现

1. 发送端

1.1 顺序消息发送时序图

2.1 rocket-mq 的有序消息实现 - 图1

1.2 无序消息发送时序图

2.1 rocket-mq 的有序消息实现 - 图2 有序和无序的区别是如何选择 MessageQueue,有序消息是通过 MessageQueueSelector 和 arg 进行选择,而无序消息是通过轮询容灾策略选择。

2. 服务端

2.1 服务端处理时序图

2.1 rocket-mq 的有序消息实现 - 图3 todo 继续深入设计 mq 的存储结构,暂时忽略

3. 消费端

3.1 顺序消息消费

2.1 rocket-mq 的有序消息实现 - 图4

参考

  1. 什么是顺序消息