使用SelectMessageQueueByHash 保证顺序有序
说明
这里演示使用SelectMessageQueueByHash ,也可以自定义消息队列选择器:
自定义消息队列选择器的博客地址是 : https://zjj1994.blog.csdn.net/article/details/120875465
Comsumer
package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.consumer.ConsumeFromWhere;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.subscribe("OrderTopicTest", "*");/***消费者端注册一个监听器,MessageListenerOrderly ,* 从队列里面拿*/consumer.registerMessageListener(new MessageListenerOrderly() {/***** @param msgs* @param context* @return*/@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.println("收到消息内容 " + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS; // 返回成功消费标识}});// MessageListenerConcurrently 是乱拿的,保证不了消费顺序// 这样是保证不了最终消费顺序的。// consumer.registerMessageListener(new MessageListenerConcurrently() {// @Override// public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// for(MessageExt msg:msgs){// System.out.println("收到消息内容 "+new String(msg.getBody()));// }// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// }// });consumer.start();System.out.printf("Consumer Started.%n");}}
Producer
package org.apache.rocketmq.example.ordermessage.demo02;import org.apache.rocketmq.client.exception.MQBrokerException;import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;import org.apache.rocketmq.common.message.Message;import org.apache.rocketmq.remoting.common.RemotingHelper;import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;import java.util.UUID;/*** 使用 SelectMessageQueueByHash*/public class Producer2 {public static void main(String[] args) throws UnsupportedEncodingException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");producer.start();for (int i = 0; i < 500; i++) { // 生成500个订单//生成50个订单,这里订单号就用uuid来代替了, 实际情况下每个公司的订单id生成方案是不一样的String orderId = UUID.randomUUID().toString();// 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来for (int j = 1; j <= 4; j++) {// 实例化一个消息Message msg =new Message("OrderTopicTest", "orderTag", "KEY" + orderId,("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));/** MessageQueueSelector 是消息队列选择器, 这个作用是选择消息发送到哪个队列里面去* @param 参数1 : msg 你发送的消息* @param 参数2 : selector 消息Queue选择器,* @param 参数3: args 传给 消息队列选择器 使用的参数,SelectMessageQueueByHash 中select方法的第三个参数就是这个值传过去的*/SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderId);System.out.printf("%s%n", sendResult);}}producer.shutdown();} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}}
测试说明
Comsumer控制台结果
随便查看一个Comsumer控制台,
你会发现接收的消息是有序的, 即使中间夹杂着别的消息, 但是总体上都是有序的.
比如说 
a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:1和
a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:2
中间有个 8cf0133a-9e8d-44b6-96d9-4f30286f761a 步骤:4
但是 a2c588f8-1ec3-4de1-997b-545c43b15a33 整体是有序的
SelectMessageQueueByHash源码
public class SelectMessageQueueByHash implements MessageQueueSelector {/**mqs 是MQ里面所有的队列msg是消息arg是 producer.send 方法第三个参数*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {// 获取参数的hashcode值,int value = arg.hashCode();// 防止出现负数,因为hashcode算出来的结果可能是负数,取个绝对值,这也是我们平时开发中需要注意到的点if (value < 0) {value = Math.abs(value);}// 直接取余队列个数。value = value % mqs.size();//根据余数获取队列,这个队列就是消息要投递的队列return mqs.get(value);}}
arg参数说明
arg参数就是这里传过去的,.

