说明
这里使用的是自定义消息队列选择器保证消息的有序, 其实可以使用SelectMessageQueueByHash 来保证消息的有序,
使用SelectMessageQueueByHash 保证顺序有序的帖子看: https://zjj1994.blog.csdn.net/article/details/120883178
怎么保证全局有序?
保证全局有序有个最简单的方式,就是topic里面只有一个队列,这样就可以保证全局有序,但是有人会这样用么? 肯定不会的,因为这样用的话,性能吞吐量安全性都会非常的差.
所以都是保证局部有序,而不是全局有序.
保证顺序有序使用场景
订单支付,一个订单下来必须是有顺序的,比如说 必须先支付 然后再营销 ,然后进入 物流发订单 , 这样顺序是不能乱的.
或者聊天功能: 我们所有人发的顺序需要保证有序,不能说是你先发的,然后再别人后面才到.
所以这就是要保证局部有序.
案例
生产者发送10个订单,每个订单里面有六个步骤,每个步骤都会发送一个消息过去, 这六个步骤要求得是顺序性的.
consumer要保证消费的顺序是一样的.
所有的MQ只能保证在一个queue里面消息是有序的, 如果是Kafka的话就是Partition.
RocketMQ保证的是局部有序,而不是全局有序
什么是顺序局部有序? 顺序局部有序需要你的生产者和消费者一起配合才能做到.
消费者
注意: 注册Listener的时候 ,registerMessageListener 需要使用 MessageListenerOrderly ,这种取出来的消息是有序的. 如果用的是别的MessageListener,比如说MessageListenerConcurrently就不能保证有序了.
package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("OrderTopicTest", "*");/***消费者端注册一个监听器,MessageListenerOrderly ,* 从队列里面拿*/consumer.registerMessageListener(new MessageListenerOrderly() {/***** @param msgs* @param context* @return*/@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.println("收到消息内容 " + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS; // 返回成功消费标识}});// MessageListenerConcurrently 是乱拿的,保证不了消费顺序// 这样是保证不了最终消费顺序的。// consumer.registerMessageListener(new MessageListenerConcurrently() {// @Override// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// for(MessageExt msg:msgs){// System.out.println("收到消息内容 "+new String(msg.getBody()));// }// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }// });consumer.start();System.out.printf("Consumer Started.%n");}}
自定义消息队列选择器
里面的注解我写的很详细了, 就不多做解释了, 
另外需要注意一个坑,就是String字符串在hashCode的时候可能会出现负数,这个需要注意.
为什么会string hashCode 完了之后会出现负数? 看这个博客 :https://zjj1994.blog.csdn.net/article/details/120875055
package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.producer.MessageQueueSelector;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;/*** 自定义消息队列选择器*/public class MyMessageQueueSelector implements MessageQueueSelector {/*** 选择队列** @param mqs 这个topic下面所有messageQueue* @param msg 发送的消息* @param arg 这个参数是 生产者producer 调用 send 方法的时候第三个参数传过来的* @return*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// orderId取模MessageQueue的Size,获取到一个索引,这样就保证了同一个Order里面的消息存到了同一个队列String orderId = (String) arg; // 获取传过来的orderId// 进行hash操作//下面 & Integer.MAX_VALUE 的目的是复制hashCode出来的是负数int orderIdHashCode = orderId.hashCode() & Integer.MAX_VALUE;// hashCode值和队列的长度进行取余数,取出来一个整数int index = orderIdHashCode % mqs.size();// 通过上面取余数获取一个队列, 往这个队列里面投递消息,这样就能保证MessageQueue messageQueue = mqs.get(index);return messageQueue;}}
生产者
生成500个订单, 每个订单都有4个步骤 ,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付的消息 在 下单消息 之前过来, 这样就出现业务bug了.
package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;import java.util.UUID;public class Producer {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();for (int i = 0; i < 500; i++) { // 生成500个订单//生成50个订单,这里订单号就用uuid来代替了, 实际情况下每个公司的订单id生成方案是不一样的String orderId = UUID.randomUUID().toString();// 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来for (int j = 1; j <= 4; j++) {// 实例化一个消息Message msg =new Message("OrderTopicTest", "orderTag", "KEY" + orderId,("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));//实例化 自己编写的消息队列选择器MyMessageQueueSelector myMessageQueueSelector = new MyMessageQueueSelector();/** MessageQueueSelector 是消息队列选择器, 这个作用是选择消息发送到哪个队列里面去* @param 参数1 : msg 你发送的消息* @param 参数2 : selector 消息Queue选择器,* @param 参数3: args 传给 消息队列选择器 使用的参数,myMessageQueueSelector 中select方法的第三个参数就是这个值传过去的*/SendResult sendResult = producer.send(msg, myMessageQueueSelector, orderId);System.out.printf("%s%n", sendResult);}}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
执行结果
启动两个消费者
先启动两个消费者, 勾选允许多个实例,这样一个消费者就能启动两个了
一套代码启动了两个实例出来
启动生产者生产消息
查看消费者控制台
随便查看一个Consumer的控制台发现

随便看几个,发现步骤顺序都是有序的.
特殊情况!部分有序

8cf0133a-9e8d-44b6-96d9-4f30286f761a 订单中间还夹杂着一个 a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:1的消息 ,但是 8cf0133a-9e8d-44b6-96d9-4f30286f761a 步骤顺序还是有序的
虽然说 同一个订单id 的消息没有挨在一起消费, 但是 他们的顺序还是有序的. 这就是部分有序.
a2c588f8-1ec3-4de1-997b-545c43b15a33  订单也是有序的
