使用SelectMessageQueueByHash 保证顺序有序

说明

这里演示使用SelectMessageQueueByHash ,也可以自定义消息队列选择器:
自定义消息队列选择器的博客地址是 : https://zjj1994.blog.csdn.net/article/details/120875465

Comsumer

  1. package org.apache.rocketmq.example.ordermessage;
  2. import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
  3. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
  4. import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
  5. import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
  6. import org.apache.rocketmq.client.exception.MQClientException;
  7. import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
  8. import org.apache.rocketmq.common.message.MessageExt;
  9. import java.util.List;
  10. public class Consumer {
  11. public static void main(String[] args) throws MQClientException {
  12. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
  13. consumer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  14. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
  15. consumer.subscribe("OrderTopicTest", "*");
  16. /**
  17. *消费者端注册一个监听器,MessageListenerOrderly ,
  18. * 从队列里面拿
  19. */
  20. consumer.registerMessageListener(new MessageListenerOrderly() {
  21. /***
  22. *
  23. * @param msgs
  24. * @param context
  25. * @return
  26. */
  27. @Override
  28. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
  29. context.setAutoCommit(true);
  30. for (MessageExt msg : msgs) {
  31. System.out.println("收到消息内容 " + new String(msg.getBody()));
  32. }
  33. return ConsumeOrderlyStatus.SUCCESS; // 返回成功消费标识
  34. }
  35. });
  36. // MessageListenerConcurrently 是乱拿的,保证不了消费顺序
  37. // 这样是保证不了最终消费顺序的。
  38. // consumer.registerMessageListener(new MessageListenerConcurrently() {
  39. // @Override
  40. // public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  41. // for(MessageExt msg:msgs){
  42. // System.out.println("收到消息内容 "+new String(msg.getBody()));
  43. // }
  44. // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  45. // }
  46. // });
  47. consumer.start();
  48. System.out.printf("Consumer Started.%n");
  49. }
  50. }


Producer

  1. package org.apache.rocketmq.example.ordermessage.demo02;
  2. import org.apache.rocketmq.client.exception.MQBrokerException;
  3. import org.apache.rocketmq.client.exception.MQClientException;
  4. import org.apache.rocketmq.client.producer.DefaultMQProducer;
  5. import org.apache.rocketmq.client.producer.SendResult;
  6. import org.apache.rocketmq.client.producer.selector.SelectMessageQueueByHash;
  7. import org.apache.rocketmq.common.message.Message;
  8. import org.apache.rocketmq.remoting.common.RemotingHelper;
  9. import org.apache.rocketmq.remoting.exception.RemotingException;
  10. import java.io.UnsupportedEncodingException;
  11. import java.util.UUID;
  12. /**
  13. * 使用 SelectMessageQueueByHash
  14. */
  15. public class Producer2 {
  16. public static void main(String[] args) throws UnsupportedEncodingException {
  17. try {
  18. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  19. producer.setNamesrvAddr("zjj101:9876;zjj102:9876;zjj103:9876");
  20. producer.start();
  21. for (int i = 0; i < 500; i++) { // 生成500个订单
  22. //生成50个订单,这里订单号就用uuid来代替了, 实际情况下每个公司的订单id生成方案是不一样的
  23. String orderId = UUID.randomUUID().toString();
  24. // 每个订单有4个步骤,比如说 下单 支付 确认收货 ,评价 ,每个步骤都会发送一个消息过去,并且这个消息不允许顺序乱,也就是 不能 支付在下单之前过来
  25. for (int j = 1; j <= 4; j++) {
  26. // 实例化一个消息
  27. Message msg =
  28. new Message("OrderTopicTest", "orderTag", "KEY" + orderId,
  29. ("订单Id:" + orderId + " 步骤:" + j).getBytes(RemotingHelper.DEFAULT_CHARSET));
  30. /*
  31. * MessageQueueSelector 是消息队列选择器, 这个作用是选择消息发送到哪个队列里面去
  32. * @param 参数1 : msg 你发送的消息
  33. * @param 参数2 : selector 消息Queue选择器,
  34. * @param 参数3: args 传给 消息队列选择器 使用的参数,SelectMessageQueueByHash 中select方法的第三个参数就是这个值传过去的
  35. */
  36. SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderId);
  37. System.out.printf("%s%n", sendResult);
  38. }
  39. }
  40. producer.shutdown();
  41. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }

测试说明

执行两个Comsumer
然后再执行Producer2

Comsumer控制台结果

随便查看一个Comsumer控制台,
你会发现接收的消息是有序的, 即使中间夹杂着别的消息, 但是总体上都是有序的.
比如说
a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:1和
a2c588f8-1ec3-4de1-997b-545c43b15a33 步骤:2
中间有个 8cf0133a-9e8d-44b6-96d9-4f30286f761a 步骤:4
但是 a2c588f8-1ec3-4de1-997b-545c43b15a33 整体是有序的
image.png

SelectMessageQueueByHash源码

  1. public class SelectMessageQueueByHash implements MessageQueueSelector {
  2. /**
  3. mqs 是MQ里面所有的队列
  4. msg是消息
  5. arg是 producer.send 方法第三个参数
  6. */
  7. @Override
  8. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  9. // 获取参数的hashcode值,
  10. int value = arg.hashCode();
  11. // 防止出现负数,因为hashcode算出来的结果可能是负数,取个绝对值,这也是我们平时开发中需要注意到的点
  12. if (value < 0) {
  13. value = Math.abs(value);
  14. }
  15. // 直接取余队列个数。
  16. value = value % mqs.size();
  17. //根据余数获取队列,这个队列就是消息要投递的队列
  18. return mqs.get(value);
  19. }
  20. }

arg参数说明

arg参数就是这里传过去的,.

RocketMQ使用SelectMessageQueueByHash 保证顺序有序 - 图2