序言
rocketmq 支持顺序消息,本文将介绍:
- 是全局顺序还是分区顺序?
- 会出现短暂乱序吗?此时如何处理?
- 有序是以发送方角度还是接收方角度?
- 如何发送和消费顺序消息?
- 有序是如何实现的?是服务端实现还是消费端实现?
一、rocket mq 顺序消息特性
1. 全局顺序 VS 分区顺序
metaq只支持分区顺序!
顺序消息类型分为两种:全局顺序和分区顺序。
全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
2. 注意事项
- 顺序消息暂不支持广播模式。
- 同一个 Producer ID 或者 Consumer ID 只能对应一种类型的 Topic,即不能同时用于顺序消息和无序消息的收发。
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
- 基于上述实现,当服务端进行扩缩容时,队列数变化,会出现短暂的乱序可能
二、rocket mq 顺序消息使用
1. 发送方
发送方和无序消息的区别是在发送时选择 MessageQueue。
public class OrderedProducer {public static void main(String[] args) throws Exception {//Instantiate with a producer group name.MQProducer producer = new DefaultMQProducer("example_group_name");//Launch the instance.producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;//Create a message instance, specifying topic, tag and message body.Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}//server shutdownproducer.shutdown();}}
2. 接收方
接收方和无序消息的区别时注册 MessageListenerOrderly
public class OrderedConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 3) == 0) {return ConsumeOrderlyStatus.ROLLBACK;} else if ((this.consumeTimes.get() % 4) == 0) {return ConsumeOrderlyStatus.COMMIT;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
三、rocket mq 顺序消息实现
1. 发送端
1.1 顺序消息发送时序图
1.2 无序消息发送时序图
有序和无序的区别是如何选择 MessageQueue,有序消息是通过 MessageQueueSelector 和 arg 进行选择,而无序消息是通过轮询容灾策略选择。
